diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 6122e27d866..44b3bfe4b83 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -55,6 +55,7 @@ import io.druid.common.guava.GuavaUtils; import io.druid.common.utils.JodaUtils; import io.druid.common.utils.SerializerUtils; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.column.BitmapIndexSeeker; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.column.ValueType; @@ -216,8 +217,11 @@ public class IndexMerger ProgressIndicator progress ) throws IOException { - return merge( - Lists.transform( + // We are materializing the list for performance reasons. Lists.transform + // only creates a "view" of the original list, meaning the function gets + // applied every time you access an element. + List indexAdapteres = Lists.newArrayList( + Iterables.transform( indexes, new Function() { @@ -227,7 +231,10 @@ public class IndexMerger return new QueryableIndexIndexableAdapter(input); } } - ), + ) + ); + return merge( + indexAdapteres, metricAggs, outDir, null, @@ -842,13 +849,17 @@ public class IndexMerger tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); } + BitmapIndexSeeker[] bitmapIndexSeeker = new BitmapIndexSeeker[indexes.size()]; + for (int j = 0; j < indexes.size(); j++) { + bitmapIndexSeeker[j] = indexes.get(j).getBitmapIndexSeeker(dimension); + } for (String dimVal : IndexedIterable.create(dimVals)) { progress.progress(); List> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size()); for (int j = 0; j < indexes.size(); ++j) { convertedInverteds.add( new ConvertingIndexedInts( - indexes.get(j).getBitmapIndex(dimension, dimVal), rowNumConversions.get(j) + bitmapIndexSeeker[j].seek(dimVal), rowNumConversions.get(j) ) ); } @@ -998,6 +1009,7 @@ public class IndexMerger private int currIndex; private String lastVal = null; + private String currValue; DimValueConverter( Indexed dimSet @@ -1007,6 +1019,7 @@ public class IndexMerger conversionBuf = ByteBuffer.allocateDirect(dimSet.size() * Ints.BYTES).asIntBuffer(); currIndex = 0; + currValue = null; } public void convert(String value, int index) @@ -1020,7 +1033,9 @@ public class IndexMerger } return; } - String currValue = dimSet.get(currIndex); + if (currValue == null) { + currValue = dimSet.get(currIndex); + } while (currValue == null) { conversionBuf.position(conversionBuf.position() + 1); @@ -1037,6 +1052,8 @@ public class IndexMerger ++currIndex; if (currIndex == dimSet.size()) { lastVal = value; + } else { + currValue = dimSet.get(currIndex); } } else if (currValue.compareTo(value) < 0) { throw new ISE( diff --git a/processing/src/main/java/io/druid/segment/IndexableAdapter.java b/processing/src/main/java/io/druid/segment/IndexableAdapter.java index 3c09d7a3155..94f0fbe5700 100644 --- a/processing/src/main/java/io/druid/segment/IndexableAdapter.java +++ b/processing/src/main/java/io/druid/segment/IndexableAdapter.java @@ -17,6 +17,7 @@ package io.druid.segment; +import io.druid.segment.column.BitmapIndexSeeker; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; @@ -41,6 +42,8 @@ public interface IndexableAdapter IndexedInts getBitmapIndex(String dimension, String value); + BitmapIndexSeeker getBitmapIndexSeeker(String dimension); + String getMetricType(String metric); ColumnCapabilities getCapabilities(String column); diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java index 5d64edea45b..a365850e6a8 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java @@ -27,10 +27,12 @@ import com.metamx.common.ISE; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.logger.Logger; import io.druid.segment.column.BitmapIndex; +import io.druid.segment.column.BitmapIndexSeeker; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ComplexColumn; import io.druid.segment.column.DictionaryEncodedColumn; +import io.druid.segment.column.EmptyBitmapIndexSeeker; import io.druid.segment.column.GenericColumn; import io.druid.segment.column.IndexedFloatsGenericColumn; import io.druid.segment.column.IndexedLongsGenericColumn; @@ -38,6 +40,7 @@ import io.druid.segment.column.ValueType; import io.druid.segment.data.ArrayBasedIndexedInts; import io.druid.segment.data.BitmapCompressedIndexedInts; import io.druid.segment.data.EmptyIndexedInts; +import io.druid.segment.data.GenericIndexed; import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedIterable; @@ -331,4 +334,76 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter { return input.getColumn(column).getCapabilities(); } + + @Override + public BitmapIndexSeeker getBitmapIndexSeeker(String dimension) + { + final Column column = input.getColumn(dimension); + + if (column == null) { + return new EmptyBitmapIndexSeeker(); + } + + final BitmapIndex bitmaps = column.getBitmapIndex(); + if (bitmaps == null) { + return new EmptyBitmapIndexSeeker(); + } + + final Indexed dimSet = getDimValueLookup(dimension); + + // BitmapIndexSeeker is the main performance boost comes from. + // In the previous version of index merge, during the creation of invert index, we do something like + // merge sort of multiply bitmap indexes. It simply iterator all the previous sorted values, + // and "binary find" the id in each bitmap indexes, which involves disk IO and is really slow. + // Suppose we have N (which is 100 in our test) small segments, each have M (which is 50000 in our case) rows. + // In high cardinality scenario, we will almost have N * M uniq values. So the complexity will be O(N * M * M * LOG(M)). + + // There are 2 properties we did not use during the merging: + // 1. We always travel the dimension values sequentially + // 2. One single dimension value is valid only in one index when cardinality is high enough + // So we introduced the BitmapIndexSeeker, which can only seek value sequentially and can never seek back. + // By using this and the help of "getDimValueLookup", we only need to translate all dimension value to its ID once, + // and the translation is done by self-increase of the integer. We only need to change the CACHED value once after + // previous value is hit, renew the value and increase the ID. The complexity now is O(N * M * LOG(M)). + return new BitmapIndexSeeker() + { + private int currIndex = 0; + private String currVal = null; + private String lastVal = null; + + @Override + public IndexedInts seek(String value) + { + if (dimSet == null || dimSet.size() == 0) { + return new EmptyIndexedInts(); + } + if (lastVal != null) { + if (GenericIndexed.STRING_STRATEGY.compare(value, lastVal) <= 0) { + throw new ISE("Value[%s] is less than the last value[%s] I have, cannot be.", + value, lastVal); + } + return new EmptyIndexedInts(); + } + if (currVal == null) { + currVal = dimSet.get(currIndex); + } + int compareResult = GenericIndexed.STRING_STRATEGY.compare(currVal, value); + if (compareResult == 0) { + IndexedInts ret = new BitmapCompressedIndexedInts(bitmaps.getBitmap(currIndex)); + ++currIndex; + if (currIndex == dimSet.size()) { + lastVal = value; + } else { + currVal = dimSet.get(currIndex); + } + return ret; + } else if (compareResult < 0) { + throw new ISE("Skipped currValue[%s], currIndex[%,d]; incoming value[%s]", + currVal, currIndex, value); + } else { + return new EmptyIndexedInts(); + } + } + }; + } } diff --git a/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java b/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java index bc4059a836f..b5046724224 100644 --- a/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java @@ -21,6 +21,7 @@ package io.druid.segment; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; +import io.druid.segment.column.BitmapIndexSeeker; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; @@ -92,4 +93,10 @@ public class RowboatFilteringIndexAdapter implements IndexableAdapter { return baseAdapter.getCapabilities(column); } + + @Override + public BitmapIndexSeeker getBitmapIndexSeeker(String dimension) + { + return baseAdapter.getBitmapIndexSeeker(dimension); + } } diff --git a/processing/src/main/java/io/druid/segment/column/BitmapIndexSeeker.java b/processing/src/main/java/io/druid/segment/column/BitmapIndexSeeker.java new file mode 100644 index 00000000000..6b369bce080 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/column/BitmapIndexSeeker.java @@ -0,0 +1,28 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.column; + +import io.druid.segment.data.IndexedInts; + +/** + * Only support access in order + */ +public interface BitmapIndexSeeker +{ + public IndexedInts seek(String value); +} diff --git a/processing/src/main/java/io/druid/segment/column/EmptyBitmapIndexSeeker.java b/processing/src/main/java/io/druid/segment/column/EmptyBitmapIndexSeeker.java new file mode 100644 index 00000000000..9756f9fae21 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/column/EmptyBitmapIndexSeeker.java @@ -0,0 +1,32 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.column; + +import io.druid.segment.data.EmptyIndexedInts; +import io.druid.segment.data.IndexedInts; + +public class EmptyBitmapIndexSeeker implements BitmapIndexSeeker +{ + + @Override + public IndexedInts seek(String value) + { + return new EmptyIndexedInts(); + } + +} diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index d593a68f1dc..2a6399de3f6 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -22,21 +22,29 @@ package io.druid.segment.incremental; import com.google.common.base.Function; import com.google.common.collect.Maps; import com.metamx.collections.bitmap.BitmapFactory; +import com.metamx.collections.bitmap.ImmutableBitmap; import com.metamx.collections.bitmap.MutableBitmap; +import com.metamx.common.ISE; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; + import io.druid.segment.IndexableAdapter; import io.druid.segment.Rowboat; +import io.druid.segment.column.BitmapIndexSeeker; import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.EmptyBitmapIndexSeeker; import io.druid.segment.data.EmptyIndexedInts; +import io.druid.segment.data.GenericIndexed; import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedIterable; import io.druid.segment.data.ListIndexed; + import org.joda.time.Interval; import org.roaringbitmap.IntIterator; import javax.annotation.Nullable; + import java.io.IOException; import java.util.Iterator; import java.util.Map; @@ -235,59 +243,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter return new EmptyIndexedInts(); } - return new IndexedInts() - { - @Override - public int size() - { - return bitmapIndex.size(); - } - - @Override - public int get(int index) - { - throw new UnsupportedOperationException("This is really slow, so it's just not supported."); - } - - @Override - public Iterator iterator() - { - return new Iterator() - { - IntIterator baseIter = bitmapIndex.iterator(); - - @Override - public boolean hasNext() - { - return baseIter.hasNext(); - } - - @Override - public Integer next() - { - return baseIter.next(); - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public void fill(int index, int[] toFill) - { - throw new UnsupportedOperationException("fill not supported"); - } - - @Override - public void close() throws IOException - { - - } - }; + return new BitmapIndexedInts(bitmapIndex); } @Override @@ -301,4 +257,95 @@ public class IncrementalIndexAdapter implements IndexableAdapter { return index.getCapabilities(column); } + + @Override + public BitmapIndexSeeker getBitmapIndexSeeker(String dimension) + { + final Map dimInverted = invertedIndexes.get(dimension); + if (dimInverted == null) { + return new EmptyBitmapIndexSeeker(); + } + + return new BitmapIndexSeeker() + { + private String lastVal = null; + + @Override + public IndexedInts seek(String value) + { + if (value != null && GenericIndexed.STRING_STRATEGY.compare(value, lastVal) <= 0) { + throw new ISE("Value[%s] is less than the last value[%s] I have, cannot be.", + value, lastVal); + } + lastVal = value; + final MutableBitmap bitmapIndex = dimInverted.get(value); + if (bitmapIndex == null) { + return new EmptyIndexedInts(); + } + return new BitmapIndexedInts(bitmapIndex); + } + }; + } + + static class BitmapIndexedInts implements IndexedInts { + + private final MutableBitmap bitmapIndex; + + BitmapIndexedInts(MutableBitmap bitmapIndex) + { + this.bitmapIndex = bitmapIndex; + } + + @Override + public int size() + { + return bitmapIndex.size(); + } + + @Override + public int get(int index) + { + // Slow for concise bitmaps, but is fast with roaring bitmaps, so it's just not supported. + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public Iterator iterator() + { + return new Iterator() + { + final IntIterator baseIter = bitmapIndex.iterator(); + + @Override + public boolean hasNext() + { + return baseIter.hasNext(); + } + + @Override + public Integer next() + { + return baseIter.next(); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public void fill(int index, int[] toFill) + { + throw new UnsupportedOperationException("fill not supported"); + } + + @Override + public void close() throws IOException + { + + } + } } diff --git a/processing/src/test/java/io/druid/segment/QueryableIndexIndexableAdapterTest.java b/processing/src/test/java/io/druid/segment/QueryableIndexIndexableAdapterTest.java new file mode 100644 index 00000000000..e79a6cda484 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/QueryableIndexIndexableAdapterTest.java @@ -0,0 +1,97 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment; + +import java.io.File; + +import io.druid.segment.column.BitmapIndexSeeker; +import io.druid.segment.data.CompressedObjectStrategy; +import io.druid.segment.data.ConciseBitmapSerdeFactory; +import io.druid.segment.data.IncrementalIndexTest; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexAdapter; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import com.metamx.common.ISE; + +public class QueryableIndexIndexableAdapterTest { + private final static IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger(); + private final static IndexIO INDEX_IO = TestHelper.getTestIndexIO(); + private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec( + new ConciseBitmapSerdeFactory(), + CompressedObjectStrategy.CompressionStrategy.LZ4, + CompressedObjectStrategy.CompressionStrategy.LZ4 + ); + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public final CloserRule closer = new CloserRule(false); + + @Test + public void testGetBitmapIndexSeeker() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null); + IncrementalIndexTest.populateIndex(timestamp, toPersist); + + final File tempDir = temporaryFolder.newFolder(); + final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( + toPersist.getInterval(), + toPersist, + INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory() + ); + + QueryableIndex index = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist, + tempDir, + null, + INDEX_SPEC + ) + ) + ); + + IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); + BitmapIndexSeeker bitmapIndexSeeker = adapter.getBitmapIndexSeeker("dim1"); + IndexedInts indexedInts0 = bitmapIndexSeeker.seek("0"); + Assert.assertEquals(0, indexedInts0.size()); + IndexedInts indexedInts1 = bitmapIndexSeeker.seek("1"); + Assert.assertEquals(1, indexedInts1.size()); + try { + bitmapIndexSeeker.seek("4"); + Assert.assertFalse("Only support access in order", true); + } catch(ISE ise) { + Assert.assertTrue("Only support access in order", true); + } + IndexedInts indexedInts2 = bitmapIndexSeeker.seek("2"); + Assert.assertEquals(0, indexedInts2.size()); + IndexedInts indexedInts3 = bitmapIndexSeeker.seek("3"); + Assert.assertEquals(1, indexedInts3.size()); + IndexedInts indexedInts4 = bitmapIndexSeeker.seek("4"); + Assert.assertEquals(0, indexedInts4.size()); + } +} diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java new file mode 100644 index 00000000000..b272a687e64 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java @@ -0,0 +1,70 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.incremental; + +import io.druid.segment.IndexSpec; +import io.druid.segment.IndexableAdapter; +import io.druid.segment.column.BitmapIndexSeeker; +import io.druid.segment.data.CompressedObjectStrategy; +import io.druid.segment.data.ConciseBitmapSerdeFactory; +import io.druid.segment.data.IncrementalIndexTest; +import io.druid.segment.data.IndexedInts; + +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import com.metamx.common.ISE; + +public class IncrementalIndexAdapterTest { + private static final IndexSpec INDEX_SPEC = new IndexSpec( + new ConciseBitmapSerdeFactory(), + CompressedObjectStrategy.CompressionStrategy.LZ4.name().toLowerCase(), + CompressedObjectStrategy.CompressionStrategy.LZ4.name().toLowerCase() + ); + + @Test + public void testGetBitmapIndexSeeker() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex incrementalIndex = IncrementalIndexTest.createIndex(null); + IncrementalIndexTest.populateIndex(timestamp, incrementalIndex); + IndexableAdapter adapter = new IncrementalIndexAdapter( + incrementalIndex.getInterval(), + incrementalIndex, + INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory() + ); + BitmapIndexSeeker bitmapIndexSeeker = adapter.getBitmapIndexSeeker("dim1"); + IndexedInts indexedInts0 = bitmapIndexSeeker.seek("0"); + Assert.assertEquals(0, indexedInts0.size()); + IndexedInts indexedInts1 = bitmapIndexSeeker.seek("1"); + Assert.assertEquals(1, indexedInts1.size()); + try { + bitmapIndexSeeker.seek("01"); + Assert.assertFalse("Only support access in order", true); + } catch(ISE ise) { + Assert.assertTrue("Only support access in order", true); + } + IndexedInts indexedInts2 = bitmapIndexSeeker.seek("2"); + Assert.assertEquals(0, indexedInts2.size()); + IndexedInts indexedInts3 = bitmapIndexSeeker.seek("3"); + Assert.assertEquals(1, indexedInts3.size()); + IndexedInts indexedInts4 = bitmapIndexSeeker.seek("4"); + Assert.assertEquals(0, indexedInts4.size()); + } +}