From dd774ef4dde3b4bb1c917c158b5a6696aed4d456 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 15 Dec 2015 18:38:19 +0900 Subject: [PATCH] one-pass merging of dictionary & index --- .../java/io/druid/segment/IndexMerger.java | 505 ++++++++++-------- .../java/io/druid/segment/IndexMergerV9.java | 192 ++----- .../druid/segment/filter/SpatialFilter.java | 3 - .../segment/DictionaryMergeIteratorTest.java | 64 +++ .../io/druid/segment/IndexMergerTest.java | 4 +- 5 files changed, 395 insertions(+), 373 deletions(-) create mode 100644 processing/src/test/java/io/druid/segment/DictionaryMergeIteratorTest.java diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index ffaf43d80ba..d1e85df5750 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -21,9 +21,9 @@ package io.druid.segment; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import com.google.common.base.Strings; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -31,6 +31,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import com.google.common.collect.PeekingIterator; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; import com.google.common.io.Files; @@ -45,6 +46,7 @@ import com.metamx.collections.spatial.RTree; import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; import com.metamx.common.IAE; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.nary.BinaryFn; @@ -92,9 +94,12 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.PriorityQueue; import java.util.Set; import java.util.TreeSet; @@ -641,7 +646,7 @@ public class IndexMerger ArrayList> dimConversions = Lists.newArrayListWithCapacity(indexes.size()); final ArrayList convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size()); - for (IndexableAdapter index : indexes) { + for (int i = 0; i < indexes.size(); ++i) { dimConversions.add(Maps.newHashMap()); } @@ -651,22 +656,22 @@ public class IndexMerger ); writer.open(); - List> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size() + 1); - DimValueConverter[] converters = new DimValueConverter[indexes.size()]; + boolean dimHasNull = false; boolean dimHasValues = false; boolean dimAbsentFromSomeIndex = false; - boolean[] dimHasValuesByIndex = new boolean[indexes.size()]; + int numMergeIndex = 0; + Indexed dimValueLookup = null; + Indexed[] dimValueLookups = new Indexed[indexes.size() + 1]; for (int i = 0; i < indexes.size(); i++) { Indexed dimValues = indexes.get(i).getDimValueLookup(dimension); if (!isNullColumn(dimValues)) { dimHasValues = true; - dimHasValuesByIndex[i] = true; - dimValueLookups.add(dimValues); - converters[i] = new DimValueConverter(dimValues); + dimHasNull |= dimValues.indexOf(null) >= 0; + dimValueLookups[i] = dimValueLookup = dimValues; + numMergeIndex++; } else { dimAbsentFromSomeIndex = true; - dimHasValuesByIndex[i] = false; } } @@ -680,57 +685,33 @@ public class IndexMerger * later on, to allow rows from indexes without a particular dimension to merge correctly with * rows from indexes with null/empty str values for that dimension. */ - if (convertMissingDims) { - dimValueLookups.add(EMPTY_STR_DIM_VAL); - for (int i = 0; i < indexes.size(); i++) { - if (!dimHasValuesByIndex[i]) { - converters[i] = new DimValueConverter(EMPTY_STR_DIM_VAL); - } - } + if (convertMissingDims && !dimHasNull) { + dimValueLookups[indexes.size()] = dimValueLookup = EMPTY_STR_DIM_VAL; + numMergeIndex++; } - Iterable dimensionValues = CombiningIterable.createSplatted( - Iterables.transform( - dimValueLookups, - new Function, Iterable>() - { - @Override - public Iterable apply(@Nullable Indexed indexed) - { - return Iterables.transform( - indexed, - new Function() - { - @Override - public String apply(@Nullable String input) - { - return (input == null) ? "" : input; - } - } - ); - } - } - ) - , - Ordering.natural().nullsFirst() - ); + int cardinality = 0; + if (numMergeIndex > 1) { + DictionaryMergeIterator iterator = new DictionaryMergeIterator(dimValueLookups, true); - int count = 0; - for (String value : dimensionValues) { - value = value == null ? "" : value; - writer.write(value); - - for (int i = 0; i < indexes.size(); i++) { - DimValueConverter converter = converters[i]; - if (converter != null) { - converter.convert(value, count); - } + while (iterator.hasNext()) { + writer.write(iterator.next()); } - ++count; + for (int i = 0; i < indexes.size(); i++) { + if (dimValueLookups[i] != null && iterator.needConversion(i)) { + dimConversions.get(i).put(dimension, iterator.conversions[i]); + } + } + cardinality = iterator.counter; + } else if (numMergeIndex == 1) { + for (String value : dimValueLookup) { + writer.write(value); + } + cardinality = dimValueLookup.size(); } - dimensionCardinalities.put(dimension, count); + dimensionCardinalities.put(dimension, cardinality); FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true); dimOuts.add(dimOut); @@ -738,12 +719,6 @@ public class IndexMerger writer.close(); serializerUtils.writeString(dimOut, dimension); ByteStreams.copy(writer.combineStreams(), dimOut); - for (int i = 0; i < indexes.size(); ++i) { - DimValueConverter converter = converters[i]; - if (converter != null) { - dimConversions.get(i).put(dimension, converters[i].getConversionBuffer()); - } - } ioPeon.cleanup(); } @@ -753,64 +728,14 @@ public class IndexMerger progress.progress(); startTime = System.currentTimeMillis(); - ArrayList> boats = Lists.newArrayListWithCapacity(indexes.size()); - - for (int i = 0; i < indexes.size(); ++i) { - final IndexableAdapter adapter = indexes.get(i); - - final int[] dimLookup = new int[mergedDimensions.size()]; - int count = 0; - for (String dim : adapter.getDimensionNames()) { - dimLookup[count] = mergedDimensions.indexOf(dim); - count++; - } - - final int[] metricLookup = new int[mergedMetrics.size()]; - count = 0; - for (String metric : adapter.getMetricNames()) { - metricLookup[count] = mergedMetrics.indexOf(metric); - count++; - } - - boats.add( - new MMappedIndexRowIterable( - Iterables.transform( - indexes.get(i).getRows(), - new Function() - { - @Override - public Rowboat apply(@Nullable Rowboat input) - { - int[][] newDims = new int[mergedDimensions.size()][]; - int j = 0; - for (int[] dim : input.getDims()) { - newDims[dimLookup[j]] = dim; - j++; - } - - Object[] newMetrics = new Object[mergedMetrics.size()]; - j = 0; - for (Object met : input.getMetrics()) { - newMetrics[metricLookup[j]] = met; - j++; - } - - return new Rowboat( - input.getTimestamp(), - newDims, - newMetrics, - input.getRowNum() - ); - } - } - ), - mergedDimensions, dimConversions.get(i), i, - convertMissingDimsFlags - ) - ); - } - - Iterable theRows = rowMergerFn.apply(boats); + Iterable theRows = makeRowIterable( + indexes, + mergedDimensions, + mergedMetrics, + dimConversions, + convertMissingDimsFlags, + rowMergerFn + ); CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create( ioPeon, "little_end_time", IndexIO.BYTE_ORDER, CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY @@ -969,22 +894,15 @@ public class IndexMerger tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); } - DictIdSeeker[] dictIdSeeker = new DictIdSeeker[indexes.size()]; - for (int j = 0; j < indexes.size(); j++) { - IntBuffer dimConversion = dimConversions.get(j).get(dimension); - if (dimConversion != null) { - dictIdSeeker[j] = new DictIdSeeker((IntBuffer) dimConversion.asReadOnlyBuffer().rewind()); - } else { - dictIdSeeker[j] = new DictIdSeeker(null); - } - } + IndexSeeker[] dictIdSeeker = toIndexSeekers(indexes, dimConversions, dimension); + //Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result. for (int dictId = 0; dictId < dimVals.size(); dictId++) { progress.progress(); List> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size()); for (int j = 0; j < indexes.size(); ++j) { int seekedDictId = dictIdSeeker[j].seek(dictId); - if (seekedDictId != DictIdSeeker.NOT_EXIST) { + if (seekedDictId != IndexSeeker.NOT_EXIST) { convertedInverteds.add( new ConvertingIndexedInts( indexes.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j) @@ -1094,6 +1012,98 @@ public class IndexMerger return outDir; } + protected Iterable makeRowIterable( + List indexes, + final List mergedDimensions, + final List mergedMetrics, + ArrayList> dimConversions, + ArrayList convertMissingDimsFlags, + Function>, Iterable> rowMergerFn + ) + { + ArrayList> boats = Lists.newArrayListWithCapacity(indexes.size()); + + for (int i = 0; i < indexes.size(); ++i) { + final IndexableAdapter adapter = indexes.get(i); + + final int[] dimLookup = toLookupMap(adapter.getDimensionNames(), mergedDimensions); + final int[] metricLookup = toLookupMap(adapter.getMetricNames(), mergedMetrics); + + Iterable target = indexes.get(i).getRows(); + if (dimLookup != null || metricLookup != null) { + // resize/reorder index table if needed + target = Iterables.transform( + target, + new Function() + { + @Override + public Rowboat apply(Rowboat input) + { + int[][] newDims = input.getDims(); + if (dimLookup != null) { + newDims = new int[mergedDimensions.size()][]; + int j = 0; + for (int[] dim : input.getDims()) { + newDims[dimLookup[j]] = dim; + j++; + } + } + + Object[] newMetrics = input.getMetrics(); + if (metricLookup != null) { + newMetrics = new Object[mergedMetrics.size()]; + int j = 0; + for (Object met : input.getMetrics()) { + newMetrics[metricLookup[j]] = met; + j++; + } + } + + return new Rowboat( + input.getTimestamp(), + newDims, + newMetrics, + input.getRowNum() + ); + } + } + ); + } + boats.add( + new MMappedIndexRowIterable( + target, mergedDimensions, dimConversions.get(i), i, convertMissingDimsFlags + ) + ); + } + + return rowMergerFn.apply(boats); + } + + private int[] toLookupMap(Indexed indexed, List values) + { + if (isSame(indexed, values)) { + return null; // no need to convert + } + int[] dimLookup = new int[values.size()]; + for (int i = 0; i < indexed.size(); i++) { + dimLookup[i] = values.indexOf(indexed.get(i)); + } + return dimLookup; + } + + private boolean isSame(Indexed indexed, List values) + { + if (indexed.size() != values.size()) { + return false; + } + for (int i = 0; i < indexed.size(); i++) { + if (!indexed.get(i).equals(values.get(i))) { + return false; + } + } + return true; + } + public static ArrayList mergeIndexed(final List> indexedLists) { Set retVal = Sets.newTreeSet(Ordering.natural().nullsFirst()); @@ -1133,92 +1143,60 @@ public class IndexMerger IndexIO.checkFileSize(indexFile); } - public static class DimValueConverter + protected IndexSeeker[] toIndexSeekers( + List adapters, + ArrayList> dimConversions, + String dimension + ) { - private final Indexed dimSet; - private final IntBuffer conversionBuf; - - private int currIndex; - private String lastVal = null; - private String currValue; - - DimValueConverter( - Indexed dimSet - ) - { - this.dimSet = dimSet; - conversionBuf = ByteBuffer.allocateDirect(dimSet.size() * Ints.BYTES).asIntBuffer(); - - currIndex = 0; - currValue = null; - } - - public void convert(String value, int index) - { - if (dimSet.size() == 0) { - return; - } - if (lastVal != null) { - if (value.compareTo(lastVal) <= 0) { - throw new ISE("Value[%s] is less than the last value[%s] I have, cannot be.", value, lastVal); - } - return; - } - if (currValue == null) { - currValue = dimSet.get(currIndex); - } - - while (currValue == null) { - conversionBuf.position(conversionBuf.position() + 1); - ++currIndex; - if (currIndex == dimSet.size()) { - lastVal = value; - return; - } - currValue = dimSet.get(currIndex); - } - - if (Objects.equal(currValue, value)) { - conversionBuf.put(index); - ++currIndex; - if (currIndex == dimSet.size()) { - lastVal = value; - } else { - currValue = dimSet.get(currIndex); - } - } else if (currValue.compareTo(value) < 0) { - throw new ISE( - "Skipped currValue[%s], currIndex[%,d]; incoming value[%s], index[%,d]", currValue, currIndex, value, index - ); + IndexSeeker[] seekers = new IndexSeeker[adapters.size()]; + for (int i = 0; i < adapters.size(); i++) { + IntBuffer dimConversion = dimConversions.get(i).get(dimension); + if (dimConversion != null) { + seekers[i] = new IndexSeekerWithConversion((IntBuffer) dimConversion.asReadOnlyBuffer().rewind()); + } else { + Indexed dimValueLookup = adapters.get(i).getDimValueLookup(dimension); + seekers[i] = new IndexSeekerWithoutConversion(dimValueLookup == null ? 0 : dimValueLookup.size()); } } + return seekers; + } - public IntBuffer getConversionBuffer() + static interface IndexSeeker + { + int NOT_EXIST = -1; + int NOT_INIT = -1; + + int seek(int dictId); + } + + static class IndexSeekerWithoutConversion implements IndexSeeker + { + private final int limit; + + public IndexSeekerWithoutConversion(int limit) { - if (currIndex != conversionBuf.limit() || conversionBuf.hasRemaining()) { - throw new ISE( - "Asked for incomplete buffer. currIndex[%,d] != buf.limit[%,d]", currIndex, conversionBuf.limit() - ); - } - return (IntBuffer) conversionBuf.asReadOnlyBuffer().rewind(); + this.limit = limit; + } + + @Override + public int seek(int dictId) + { + return dictId < limit ? dictId : NOT_EXIST; } } /** * Get old dictId from new dictId, and only support access in order */ - public static class DictIdSeeker + static class IndexSeekerWithConversion implements IndexSeeker { - static final int NOT_EXIST = -1; - static final int NOT_INIT = -1; private final IntBuffer dimConversions; private int currIndex; private int currVal; private int lastVal; - DictIdSeeker( - IntBuffer dimConversions - ) + IndexSeekerWithConversion(IntBuffer dimConversions) { this.dimConversions = dimConversions; this.currIndex = 0; @@ -1233,8 +1211,9 @@ public class IndexMerger } if (lastVal != NOT_INIT) { if (dictId <= lastVal) { - throw new ISE("Value dictId[%d] is less than the last value dictId[%d] I have, cannot be.", - dictId, lastVal + throw new ISE( + "Value dictId[%d] is less than the last value dictId[%d] I have, cannot be.", + dictId, lastVal ); } return NOT_EXIST; @@ -1252,8 +1231,9 @@ public class IndexMerger } return ret; } else if (currVal < dictId) { - throw new ISE("Skipped currValue dictId[%d], currIndex[%d]; incoming value dictId[%d]", - currVal, currIndex, dictId + throw new ISE( + "Skipped currValue dictId[%d], currIndex[%d]; incoming value dictId[%d]", + currVal, currIndex, dictId ); } else { return NOT_EXIST; @@ -1357,29 +1337,23 @@ public class IndexMerger int[][] newDims = new int[convertedDims.size()][]; for (int i = 0; i < convertedDims.size(); ++i) { IntBuffer converter = converterArray[i]; - String dimName = convertedDims.get(i); - - if (converter == null) { - continue; - } - if (i >= dims.length) { continue; } - if (dims[i] == null) { - if (convertMissingDimsFlags.get(i)) { - newDims[i] = EMPTY_STR_DIM; - } + if (dims[i] == null && convertMissingDimsFlags.get(i)) { + newDims[i] = EMPTY_STR_DIM; + continue; + } + + if (converter == null) { + newDims[i] = dims[i]; continue; } newDims[i] = new int[dims[i].length]; for (int j = 0; j < dims[i].length; ++j) { - if (!converter.hasRemaining()) { - log.error("Converter mismatch! wtfbbq!"); - } newDims[i][j] = converter.get(dims[i][j]); } } @@ -1508,4 +1482,109 @@ public class IndexMerger } IndexIO.checkFileSize(metadataFile); } + + static class DictionaryMergeIterator implements Iterator + { + protected final IntBuffer[] conversions; + protected final PriorityQueue>> pQueue; + + protected int counter; + + DictionaryMergeIterator(Indexed[] dimValueLookups, boolean useDirect) + { + pQueue = new PriorityQueue<>( + dimValueLookups.length, + new Comparator>>() + { + @Override + public int compare(Pair> lhs, Pair> rhs) + { + return lhs.rhs.peek().compareTo(rhs.rhs.peek()); + } + } + ); + conversions = new IntBuffer[dimValueLookups.length]; + for (int i = 0; i < conversions.length; i++) { + if (dimValueLookups[i] == null) { + continue; + } + Indexed indexed = dimValueLookups[i]; + if (useDirect) { + conversions[i] = ByteBuffer.allocateDirect(indexed.size() * Ints.BYTES).asIntBuffer(); + } else { + conversions[i] = IntBuffer.allocate(indexed.size()); + } + + final PeekingIterator iter = Iterators.peekingIterator( + Iterators.transform( + indexed.iterator(), + new Function() + { + @Override + public String apply(@Nullable String input) + { + return Strings.nullToEmpty(input); + } + } + ) + ); + if (iter.hasNext()) { + pQueue.add(Pair.of(i, iter)); + } + } + } + + @Override + public boolean hasNext() + { + return !pQueue.isEmpty(); + } + + @Override + public String next() + { + Pair> smallest = pQueue.remove(); + if (smallest == null) { + throw new NoSuchElementException(); + } + final String value = writeTranslate(smallest, counter); + + while (!pQueue.isEmpty() && value.equals(pQueue.peek().rhs.peek())) { + writeTranslate(pQueue.remove(), counter); + } + counter++; + + return value; + } + + boolean needConversion(int index) + { + IntBuffer readOnly = conversions[index].asReadOnlyBuffer(); + readOnly.rewind(); + for (int i = 0; readOnly.hasRemaining(); i++) { + if (i != readOnly.get()) { + return true; + } + } + return false; + } + + private String writeTranslate(Pair> smallest, int counter) + { + final int index = smallest.lhs; + final String value = smallest.rhs.next(); + + conversions[index].put(counter); + if (smallest.rhs.hasNext()) { + pQueue.add(smallest); + } + return value; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("remove"); + } + } } diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index 9154901d124..7368fbb62fb 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -73,7 +73,6 @@ import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; @@ -120,7 +119,6 @@ public class IndexMergerV9 extends IndexMerger adapters, new Function() { - @Nullable @Override public Metadata apply(IndexableAdapter input) { @@ -187,7 +185,6 @@ public class IndexMergerV9 extends IndexMerger mergedDimensions, mergedMetrics, dimConversions, - dimCardinalities, convertMissingDimsFlags, rowMergerFn ); @@ -526,15 +523,7 @@ public class IndexMergerV9 extends IndexMerger tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); } - DictIdSeeker[] dictIdSeeker = new DictIdSeeker[adapters.size()]; - for (int j = 0; j < adapters.size(); j++) { - IntBuffer dimConversion = dimConversions.get(j).get(dimension); - if (dimConversion != null) { - dictIdSeeker[j] = new DictIdSeeker((IntBuffer)dimConversion.asReadOnlyBuffer().rewind()); - } else { - dictIdSeeker[j] = new DictIdSeeker(null); - } - } + IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimension); ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap( nullRowsList.get(dimIndex) @@ -546,7 +535,7 @@ public class IndexMergerV9 extends IndexMerger List> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size()); for (int j = 0; j < adapters.size(); ++j) { int seekedDictId = dictIdSeeker[j].seek(dictId); - if (seekedDictId != DictIdSeeker.NOT_EXIST) { + if (seekedDictId != IndexSeeker.NOT_EXIST) { convertedInverteds.add( new ConvertingIndexedInts( adapters.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j) @@ -567,10 +556,9 @@ public class IndexMergerV9 extends IndexMerger ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset); if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) { - bitmapIndexWriters.get(dimIndex).write(nullRowBitmap.union(bitmapToWrite)); - } else { - bitmapIndexWriters.get(dimIndex).write(bitmapToWrite); + bitmapToWrite = nullRowBitmap.union(bitmapToWrite); } + bitmapIndexWriters.get(dimIndex).write(bitmapToWrite); if (spatialIndexWriter != null) { String dimVal = dimVals.get(dictId); @@ -794,78 +782,6 @@ public class IndexMergerV9 extends IndexMerger return dimWriters; } - private Iterable makeRowIterable( - final List adapters, - final List mergedDimensions, - final List mergedMetrics, - final ArrayList> dimConversions, - final Map dimCardinalities, - final ArrayList convertMissingDimsFlags, - final Function>, Iterable> rowMergerFn - ) - { - ArrayList> boats = Lists.newArrayListWithCapacity(adapters.size()); - - for (int i = 0; i < adapters.size(); ++i) { - final IndexableAdapter adapter = adapters.get(i); - - final int[] dimLookup = new int[mergedDimensions.size()]; - int count = 0; - for (String dim : adapter.getDimensionNames()) { - dimLookup[count] = mergedDimensions.indexOf(dim); - count++; - } - - final int[] metricLookup = new int[mergedMetrics.size()]; - count = 0; - for (String metric : adapter.getMetricNames()) { - metricLookup[count] = mergedMetrics.indexOf(metric); - count++; - } - - boats.add( - new MMappedIndexRowIterable( - Iterables.transform( - adapters.get(i).getRows(), - new Function() - { - @Override - public Rowboat apply(Rowboat input) - { - int[][] newDims = new int[mergedDimensions.size()][]; - int j = 0; - for (int[] dim : input.getDims()) { - newDims[dimLookup[j]] = dim; - j++; - } - - Object[] newMetrics = new Object[mergedMetrics.size()]; - j = 0; - for (Object met : input.getMetrics()) { - newMetrics[metricLookup[j]] = met; - j++; - } - - return new Rowboat( - input.getTimestamp(), - newDims, - newMetrics, - input.getRowNum() - ); - } - } - ), - mergedDimensions, - dimConversions.get(i), - i, - convertMissingDimsFlags - ) - ); - } - - return rowMergerFn.apply(boats); - } - private ArrayList> setupDimValueWriters( final IOPeon ioPeon, final List mergedDimensions @@ -884,7 +800,7 @@ public class IndexMergerV9 extends IndexMerger } private void writeDimValueAndSetupDimConversion( - final List adapters, + final List indexes, final ProgressIndicator progress, final List mergedDimensions, final Map dimensionCardinalities, @@ -898,34 +814,29 @@ public class IndexMergerV9 extends IndexMerger final String section = "setup dimension conversions"; progress.startSection(section); - for (int i = 0; i < adapters.size(); ++i) { + for (int i = 0; i < indexes.size(); ++i) { dimConversions.add(Maps.newHashMap()); } for (int dimIndex = 0; dimIndex < mergedDimensions.size(); ++dimIndex) { long dimStartTime = System.currentTimeMillis(); String dimension = mergedDimensions.get(dimIndex); - - // lookups for all dimension values of this dimension - List> dimValueLookups = Lists.newArrayListWithCapacity(adapters.size()); - - // each converter converts dim values of this dimension to global dictionary - DimValueConverter[] converters = new DimValueConverter[adapters.size()]; - + boolean dimHasNull = false; boolean dimHasValues = false; boolean dimAbsentFromSomeIndex = false; - boolean dimHasNull = false; - boolean[] dimHasValuesByIndex = new boolean[adapters.size()]; - for (int i = 0; i < adapters.size(); i++) { - Indexed dimValues = adapters.get(i).getDimValueLookup(dimension); + + int numMergeIndex = 0; + Indexed dimValueLookup = null; + Indexed[] dimValueLookups = new Indexed[indexes.size() + 1]; + for (int i = 0; i < indexes.size(); i++) { + Indexed dimValues = indexes.get(i).getDimValueLookup(dimension); if (!isNullColumn(dimValues)) { dimHasValues = true; - dimHasValuesByIndex[i] = true; - dimValueLookups.add(dimValues); - converters[i] = new DimValueConverter(dimValues); + dimHasNull |= dimValues.indexOf(null) >= 0; + dimValueLookups[i] = dimValueLookup = dimValues; + numMergeIndex++; } else { dimAbsentFromSomeIndex = true; - dimHasValuesByIndex[i] = false; } } @@ -939,58 +850,35 @@ public class IndexMergerV9 extends IndexMerger * later on, to allow rows from indexes without a particular dimension to merge correctly with * rows from indexes with null/empty str values for that dimension. */ - if (convertMissingDims) { - dimValueLookups.add(EMPTY_STR_DIM_VAL); - for (int i = 0; i < adapters.size(); i++) { - if (!dimHasValuesByIndex[i]) { - converters[i] = new DimValueConverter(EMPTY_STR_DIM_VAL); - } - } + if (convertMissingDims && !dimHasNull) { + dimHasNull = true; + dimValueLookups[indexes.size()] = dimValueLookup = EMPTY_STR_DIM_VAL; + numMergeIndex++; } - // sort all dimension values and treat all null values as empty strings - Iterable dimensionValues = CombiningIterable.createSplatted( - Iterables.transform( - dimValueLookups, - new Function, Iterable>() - { - @Override - public Iterable apply(@Nullable Indexed indexed) - { - return Iterables.transform( - indexed, - new Function() - { - @Override - public String apply(@Nullable String input) - { - return (input == null) ? "" : input; - } - } - ); - } - } - ), Ordering.natural().nullsFirst() - ); - GenericIndexedWriter writer = dimValueWriters.get(dimIndex); - int cardinality = 0; - for (String value : dimensionValues) { - value = value == null ? "" : value; - writer.write(value); - if (value.length() == 0) { - dimHasNull = true; + int cardinality = 0; + if (numMergeIndex > 1) { + DictionaryMergeIterator iterator = new DictionaryMergeIterator(dimValueLookups, true); + + while (iterator.hasNext()) { + writer.write(iterator.next()); } - for (int i = 0; i < adapters.size(); i++) { - DimValueConverter converter = converters[i]; - if (converter != null) { - converter.convert(value, cardinality); + for (int i = 0; i < indexes.size(); i++) { + if (dimValueLookups[i] != null && iterator.needConversion(i)) { + dimConversions.get(i).put(dimension, iterator.conversions[i]); } } - ++cardinality; + cardinality = iterator.counter; + } else if (numMergeIndex == 1) { + for (String value : dimValueLookup) { + writer.write(value); + } + cardinality = dimValueLookup.size(); } + // Mark if this dim has the null/empty str value in its dictionary, used for determining nullRowsList later. dimHasNullFlags.add(dimHasNull); @@ -1009,14 +897,6 @@ public class IndexMergerV9 extends IndexMerger continue; } dimensionSkipFlag.add(false); - - // make the conversion - for (int i = 0; i < adapters.size(); ++i) { - DimValueConverter converter = converters[i]; - if (converter != null) { - dimConversions.get(i).put(dimension, converters[i].getConversionBuffer()); - } - } } progress.stopSection(section); } diff --git a/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java b/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java index 24f42ea2bbe..2ca221560d0 100644 --- a/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java @@ -46,9 +46,6 @@ public class SpatialFilter implements Filter public ImmutableBitmap getBitmapIndex(final BitmapIndexSelector selector) { Iterable search = selector.getSpatialIndex(dimension).search(bound); - for (ImmutableBitmap immutableBitmap : search) { - System.out.println(immutableBitmap); - } return selector.getBitmapFactory().union(search); } diff --git a/processing/src/test/java/io/druid/segment/DictionaryMergeIteratorTest.java b/processing/src/test/java/io/druid/segment/DictionaryMergeIteratorTest.java new file mode 100644 index 00000000000..9141b2c4c9e --- /dev/null +++ b/processing/src/test/java/io/druid/segment/DictionaryMergeIteratorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment; + +import com.google.common.collect.Iterators; +import io.druid.segment.data.ArrayIndexed; +import io.druid.segment.data.Indexed; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class DictionaryMergeIteratorTest +{ + + @Test + public void basicTest() + { + // a b c d e f + String[] s1 = {"a", "c", "d", "e"}; // 0 2 3 4 + String[] s2 = {"b", "c", "e"}; // 1 2 4 + String[] s3 = {"a", "d", "f"}; // 0 3 5 + String[] s4 = {"a", "b", "c"}; + String[] s5 = {"a", "b", "c", "d", "e", "f"}; + Indexed i1 = new ArrayIndexed(s1, String.class); + Indexed i2 = new ArrayIndexed(s2, String.class); + Indexed i3 = new ArrayIndexed(s3, String.class); + Indexed i4 = new ArrayIndexed(s4, String.class); + Indexed i5 = new ArrayIndexed(s5, String.class); + + IndexMerger.DictionaryMergeIterator iterator = new IndexMerger.DictionaryMergeIterator(new Indexed[]{i1, i2, i3, i4, i5}, false); + + Assert.assertArrayEquals(new String[]{"a", "b", "c", "d", "e", "f"}, Iterators.toArray(iterator, String.class)); + + Assert.assertArrayEquals(new int[] {0, 2, 3, 4}, iterator.conversions[0].array()); + Assert.assertArrayEquals(new int[] {1, 2, 4}, iterator.conversions[1].array()); + Assert.assertArrayEquals(new int[] {0, 3, 5}, iterator.conversions[2].array()); + Assert.assertArrayEquals(new int[] {0, 1, 2}, iterator.conversions[3].array()); + Assert.assertArrayEquals(new int[] {0, 1, 2, 3, 4, 5}, iterator.conversions[4].array()); + + Assert.assertTrue(iterator.needConversion(0)); + Assert.assertTrue(iterator.needConversion(1)); + Assert.assertTrue(iterator.needConversion(2)); + Assert.assertFalse(iterator.needConversion(3)); + Assert.assertFalse(iterator.needConversion(4)); + } +} diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index 4f3562dd10f..ab0da2b32be 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -1668,7 +1668,9 @@ public class IndexMergerTest dimConversions.put(0); dimConversions.put(2); dimConversions.put(4); - IndexMerger.DictIdSeeker dictIdSeeker = new IndexMerger.DictIdSeeker((IntBuffer) dimConversions.asReadOnlyBuffer().rewind()); + IndexMerger.IndexSeeker dictIdSeeker = new IndexMerger.IndexSeekerWithConversion( + (IntBuffer) dimConversions.asReadOnlyBuffer().rewind() + ); Assert.assertEquals(0, dictIdSeeker.seek(0)); Assert.assertEquals(-1, dictIdSeeker.seek(1)); Assert.assertEquals(1, dictIdSeeker.seek(2));