one-pass merging of dictionary & index

This commit is contained in:
navis.ryu 2015-12-15 18:38:19 +09:00
parent edd7ce58aa
commit dd774ef4dd
5 changed files with 395 additions and 373 deletions

View File

@ -21,9 +21,9 @@ package io.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable; import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -31,6 +31,7 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Files; import com.google.common.io.Files;
@ -45,6 +46,7 @@ import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.guava.nary.BinaryFn;
@ -92,9 +94,12 @@ import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
@ -641,7 +646,7 @@ public class IndexMerger
ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(indexes.size()); ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(indexes.size());
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size()); final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
for (IndexableAdapter index : indexes) { for (int i = 0; i < indexes.size(); ++i) {
dimConversions.add(Maps.<String, IntBuffer>newHashMap()); dimConversions.add(Maps.<String, IntBuffer>newHashMap());
} }
@ -651,22 +656,22 @@ public class IndexMerger
); );
writer.open(); writer.open();
List<Indexed<String>> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size() + 1); boolean dimHasNull = false;
DimValueConverter[] converters = new DimValueConverter[indexes.size()];
boolean dimHasValues = false; boolean dimHasValues = false;
boolean dimAbsentFromSomeIndex = false; boolean dimAbsentFromSomeIndex = false;
boolean[] dimHasValuesByIndex = new boolean[indexes.size()];
int numMergeIndex = 0;
Indexed<String> dimValueLookup = null;
Indexed<String>[] dimValueLookups = new Indexed[indexes.size() + 1];
for (int i = 0; i < indexes.size(); i++) { for (int i = 0; i < indexes.size(); i++) {
Indexed<String> dimValues = indexes.get(i).getDimValueLookup(dimension); Indexed<String> dimValues = indexes.get(i).getDimValueLookup(dimension);
if (!isNullColumn(dimValues)) { if (!isNullColumn(dimValues)) {
dimHasValues = true; dimHasValues = true;
dimHasValuesByIndex[i] = true; dimHasNull |= dimValues.indexOf(null) >= 0;
dimValueLookups.add(dimValues); dimValueLookups[i] = dimValueLookup = dimValues;
converters[i] = new DimValueConverter(dimValues); numMergeIndex++;
} else { } else {
dimAbsentFromSomeIndex = true; dimAbsentFromSomeIndex = true;
dimHasValuesByIndex[i] = false;
} }
} }
@ -680,57 +685,33 @@ public class IndexMerger
* later on, to allow rows from indexes without a particular dimension to merge correctly with * later on, to allow rows from indexes without a particular dimension to merge correctly with
* rows from indexes with null/empty str values for that dimension. * rows from indexes with null/empty str values for that dimension.
*/ */
if (convertMissingDims) { if (convertMissingDims && !dimHasNull) {
dimValueLookups.add(EMPTY_STR_DIM_VAL); dimValueLookups[indexes.size()] = dimValueLookup = EMPTY_STR_DIM_VAL;
for (int i = 0; i < indexes.size(); i++) { numMergeIndex++;
if (!dimHasValuesByIndex[i]) {
converters[i] = new DimValueConverter(EMPTY_STR_DIM_VAL);
}
}
} }
Iterable<String> dimensionValues = CombiningIterable.createSplatted( int cardinality = 0;
Iterables.transform( if (numMergeIndex > 1) {
dimValueLookups, DictionaryMergeIterator iterator = new DictionaryMergeIterator(dimValueLookups, true);
new Function<Indexed<String>, Iterable<String>>()
{
@Override
public Iterable<String> apply(@Nullable Indexed<String> indexed)
{
return Iterables.transform(
indexed,
new Function<String, String>()
{
@Override
public String apply(@Nullable String input)
{
return (input == null) ? "" : input;
}
}
);
}
}
)
,
Ordering.<String>natural().nullsFirst()
);
int count = 0; while (iterator.hasNext()) {
for (String value : dimensionValues) { writer.write(iterator.next());
value = value == null ? "" : value;
writer.write(value);
for (int i = 0; i < indexes.size(); i++) {
DimValueConverter converter = converters[i];
if (converter != null) {
converter.convert(value, count);
}
} }
++count; for (int i = 0; i < indexes.size(); i++) {
if (dimValueLookups[i] != null && iterator.needConversion(i)) {
dimConversions.get(i).put(dimension, iterator.conversions[i]);
}
}
cardinality = iterator.counter;
} else if (numMergeIndex == 1) {
for (String value : dimValueLookup) {
writer.write(value);
}
cardinality = dimValueLookup.size();
} }
dimensionCardinalities.put(dimension, count); dimensionCardinalities.put(dimension, cardinality);
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true); FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true);
dimOuts.add(dimOut); dimOuts.add(dimOut);
@ -738,12 +719,6 @@ public class IndexMerger
writer.close(); writer.close();
serializerUtils.writeString(dimOut, dimension); serializerUtils.writeString(dimOut, dimension);
ByteStreams.copy(writer.combineStreams(), dimOut); ByteStreams.copy(writer.combineStreams(), dimOut);
for (int i = 0; i < indexes.size(); ++i) {
DimValueConverter converter = converters[i];
if (converter != null) {
dimConversions.get(i).put(dimension, converters[i].getConversionBuffer());
}
}
ioPeon.cleanup(); ioPeon.cleanup();
} }
@ -753,64 +728,14 @@ public class IndexMerger
progress.progress(); progress.progress();
startTime = System.currentTimeMillis(); startTime = System.currentTimeMillis();
ArrayList<Iterable<Rowboat>> boats = Lists.newArrayListWithCapacity(indexes.size()); Iterable<Rowboat> theRows = makeRowIterable(
indexes,
for (int i = 0; i < indexes.size(); ++i) { mergedDimensions,
final IndexableAdapter adapter = indexes.get(i); mergedMetrics,
dimConversions,
final int[] dimLookup = new int[mergedDimensions.size()]; convertMissingDimsFlags,
int count = 0; rowMergerFn
for (String dim : adapter.getDimensionNames()) { );
dimLookup[count] = mergedDimensions.indexOf(dim);
count++;
}
final int[] metricLookup = new int[mergedMetrics.size()];
count = 0;
for (String metric : adapter.getMetricNames()) {
metricLookup[count] = mergedMetrics.indexOf(metric);
count++;
}
boats.add(
new MMappedIndexRowIterable(
Iterables.transform(
indexes.get(i).getRows(),
new Function<Rowboat, Rowboat>()
{
@Override
public Rowboat apply(@Nullable Rowboat input)
{
int[][] newDims = new int[mergedDimensions.size()][];
int j = 0;
for (int[] dim : input.getDims()) {
newDims[dimLookup[j]] = dim;
j++;
}
Object[] newMetrics = new Object[mergedMetrics.size()];
j = 0;
for (Object met : input.getMetrics()) {
newMetrics[metricLookup[j]] = met;
j++;
}
return new Rowboat(
input.getTimestamp(),
newDims,
newMetrics,
input.getRowNum()
);
}
}
),
mergedDimensions, dimConversions.get(i), i,
convertMissingDimsFlags
)
);
}
Iterable<Rowboat> theRows = rowMergerFn.apply(boats);
CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create( CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create(
ioPeon, "little_end_time", IndexIO.BYTE_ORDER, CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY ioPeon, "little_end_time", IndexIO.BYTE_ORDER, CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
@ -969,22 +894,15 @@ public class IndexMerger
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
} }
DictIdSeeker[] dictIdSeeker = new DictIdSeeker[indexes.size()]; IndexSeeker[] dictIdSeeker = toIndexSeekers(indexes, dimConversions, dimension);
for (int j = 0; j < indexes.size(); j++) {
IntBuffer dimConversion = dimConversions.get(j).get(dimension);
if (dimConversion != null) {
dictIdSeeker[j] = new DictIdSeeker((IntBuffer) dimConversion.asReadOnlyBuffer().rewind());
} else {
dictIdSeeker[j] = new DictIdSeeker(null);
}
}
//Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result. //Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result.
for (int dictId = 0; dictId < dimVals.size(); dictId++) { for (int dictId = 0; dictId < dimVals.size(); dictId++) {
progress.progress(); progress.progress();
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size()); List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size());
for (int j = 0; j < indexes.size(); ++j) { for (int j = 0; j < indexes.size(); ++j) {
int seekedDictId = dictIdSeeker[j].seek(dictId); int seekedDictId = dictIdSeeker[j].seek(dictId);
if (seekedDictId != DictIdSeeker.NOT_EXIST) { if (seekedDictId != IndexSeeker.NOT_EXIST) {
convertedInverteds.add( convertedInverteds.add(
new ConvertingIndexedInts( new ConvertingIndexedInts(
indexes.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j) indexes.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j)
@ -1094,6 +1012,98 @@ public class IndexMerger
return outDir; return outDir;
} }
protected Iterable<Rowboat> makeRowIterable(
List<IndexableAdapter> indexes,
final List<String> mergedDimensions,
final List<String> mergedMetrics,
ArrayList<Map<String, IntBuffer>> dimConversions,
ArrayList<Boolean> convertMissingDimsFlags,
Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
)
{
ArrayList<Iterable<Rowboat>> boats = Lists.newArrayListWithCapacity(indexes.size());
for (int i = 0; i < indexes.size(); ++i) {
final IndexableAdapter adapter = indexes.get(i);
final int[] dimLookup = toLookupMap(adapter.getDimensionNames(), mergedDimensions);
final int[] metricLookup = toLookupMap(adapter.getMetricNames(), mergedMetrics);
Iterable<Rowboat> target = indexes.get(i).getRows();
if (dimLookup != null || metricLookup != null) {
// resize/reorder index table if needed
target = Iterables.transform(
target,
new Function<Rowboat, Rowboat>()
{
@Override
public Rowboat apply(Rowboat input)
{
int[][] newDims = input.getDims();
if (dimLookup != null) {
newDims = new int[mergedDimensions.size()][];
int j = 0;
for (int[] dim : input.getDims()) {
newDims[dimLookup[j]] = dim;
j++;
}
}
Object[] newMetrics = input.getMetrics();
if (metricLookup != null) {
newMetrics = new Object[mergedMetrics.size()];
int j = 0;
for (Object met : input.getMetrics()) {
newMetrics[metricLookup[j]] = met;
j++;
}
}
return new Rowboat(
input.getTimestamp(),
newDims,
newMetrics,
input.getRowNum()
);
}
}
);
}
boats.add(
new MMappedIndexRowIterable(
target, mergedDimensions, dimConversions.get(i), i, convertMissingDimsFlags
)
);
}
return rowMergerFn.apply(boats);
}
private int[] toLookupMap(Indexed<String> indexed, List<String> values)
{
if (isSame(indexed, values)) {
return null; // no need to convert
}
int[] dimLookup = new int[values.size()];
for (int i = 0; i < indexed.size(); i++) {
dimLookup[i] = values.indexOf(indexed.get(i));
}
return dimLookup;
}
private boolean isSame(Indexed<String> indexed, List<String> values)
{
if (indexed.size() != values.size()) {
return false;
}
for (int i = 0; i < indexed.size(); i++) {
if (!indexed.get(i).equals(values.get(i))) {
return false;
}
}
return true;
}
public static <T extends Comparable> ArrayList<T> mergeIndexed(final List<Iterable<T>> indexedLists) public static <T extends Comparable> ArrayList<T> mergeIndexed(final List<Iterable<T>> indexedLists)
{ {
Set<T> retVal = Sets.newTreeSet(Ordering.<T>natural().nullsFirst()); Set<T> retVal = Sets.newTreeSet(Ordering.<T>natural().nullsFirst());
@ -1133,92 +1143,60 @@ public class IndexMerger
IndexIO.checkFileSize(indexFile); IndexIO.checkFileSize(indexFile);
} }
public static class DimValueConverter protected IndexSeeker[] toIndexSeekers(
List<IndexableAdapter> adapters,
ArrayList<Map<String, IntBuffer>> dimConversions,
String dimension
)
{ {
private final Indexed<String> dimSet; IndexSeeker[] seekers = new IndexSeeker[adapters.size()];
private final IntBuffer conversionBuf; for (int i = 0; i < adapters.size(); i++) {
IntBuffer dimConversion = dimConversions.get(i).get(dimension);
private int currIndex; if (dimConversion != null) {
private String lastVal = null; seekers[i] = new IndexSeekerWithConversion((IntBuffer) dimConversion.asReadOnlyBuffer().rewind());
private String currValue; } else {
Indexed<String> dimValueLookup = adapters.get(i).getDimValueLookup(dimension);
DimValueConverter( seekers[i] = new IndexSeekerWithoutConversion(dimValueLookup == null ? 0 : dimValueLookup.size());
Indexed<String> dimSet
)
{
this.dimSet = dimSet;
conversionBuf = ByteBuffer.allocateDirect(dimSet.size() * Ints.BYTES).asIntBuffer();
currIndex = 0;
currValue = null;
}
public void convert(String value, int index)
{
if (dimSet.size() == 0) {
return;
}
if (lastVal != null) {
if (value.compareTo(lastVal) <= 0) {
throw new ISE("Value[%s] is less than the last value[%s] I have, cannot be.", value, lastVal);
}
return;
}
if (currValue == null) {
currValue = dimSet.get(currIndex);
}
while (currValue == null) {
conversionBuf.position(conversionBuf.position() + 1);
++currIndex;
if (currIndex == dimSet.size()) {
lastVal = value;
return;
}
currValue = dimSet.get(currIndex);
}
if (Objects.equal(currValue, value)) {
conversionBuf.put(index);
++currIndex;
if (currIndex == dimSet.size()) {
lastVal = value;
} else {
currValue = dimSet.get(currIndex);
}
} else if (currValue.compareTo(value) < 0) {
throw new ISE(
"Skipped currValue[%s], currIndex[%,d]; incoming value[%s], index[%,d]", currValue, currIndex, value, index
);
} }
} }
return seekers;
}
public IntBuffer getConversionBuffer() static interface IndexSeeker
{
int NOT_EXIST = -1;
int NOT_INIT = -1;
int seek(int dictId);
}
static class IndexSeekerWithoutConversion implements IndexSeeker
{
private final int limit;
public IndexSeekerWithoutConversion(int limit)
{ {
if (currIndex != conversionBuf.limit() || conversionBuf.hasRemaining()) { this.limit = limit;
throw new ISE( }
"Asked for incomplete buffer. currIndex[%,d] != buf.limit[%,d]", currIndex, conversionBuf.limit()
); @Override
} public int seek(int dictId)
return (IntBuffer) conversionBuf.asReadOnlyBuffer().rewind(); {
return dictId < limit ? dictId : NOT_EXIST;
} }
} }
/** /**
* Get old dictId from new dictId, and only support access in order * Get old dictId from new dictId, and only support access in order
*/ */
public static class DictIdSeeker static class IndexSeekerWithConversion implements IndexSeeker
{ {
static final int NOT_EXIST = -1;
static final int NOT_INIT = -1;
private final IntBuffer dimConversions; private final IntBuffer dimConversions;
private int currIndex; private int currIndex;
private int currVal; private int currVal;
private int lastVal; private int lastVal;
DictIdSeeker( IndexSeekerWithConversion(IntBuffer dimConversions)
IntBuffer dimConversions
)
{ {
this.dimConversions = dimConversions; this.dimConversions = dimConversions;
this.currIndex = 0; this.currIndex = 0;
@ -1233,8 +1211,9 @@ public class IndexMerger
} }
if (lastVal != NOT_INIT) { if (lastVal != NOT_INIT) {
if (dictId <= lastVal) { if (dictId <= lastVal) {
throw new ISE("Value dictId[%d] is less than the last value dictId[%d] I have, cannot be.", throw new ISE(
dictId, lastVal "Value dictId[%d] is less than the last value dictId[%d] I have, cannot be.",
dictId, lastVal
); );
} }
return NOT_EXIST; return NOT_EXIST;
@ -1252,8 +1231,9 @@ public class IndexMerger
} }
return ret; return ret;
} else if (currVal < dictId) { } else if (currVal < dictId) {
throw new ISE("Skipped currValue dictId[%d], currIndex[%d]; incoming value dictId[%d]", throw new ISE(
currVal, currIndex, dictId "Skipped currValue dictId[%d], currIndex[%d]; incoming value dictId[%d]",
currVal, currIndex, dictId
); );
} else { } else {
return NOT_EXIST; return NOT_EXIST;
@ -1357,29 +1337,23 @@ public class IndexMerger
int[][] newDims = new int[convertedDims.size()][]; int[][] newDims = new int[convertedDims.size()][];
for (int i = 0; i < convertedDims.size(); ++i) { for (int i = 0; i < convertedDims.size(); ++i) {
IntBuffer converter = converterArray[i]; IntBuffer converter = converterArray[i];
String dimName = convertedDims.get(i);
if (converter == null) {
continue;
}
if (i >= dims.length) { if (i >= dims.length) {
continue; continue;
} }
if (dims[i] == null) { if (dims[i] == null && convertMissingDimsFlags.get(i)) {
if (convertMissingDimsFlags.get(i)) { newDims[i] = EMPTY_STR_DIM;
newDims[i] = EMPTY_STR_DIM; continue;
} }
if (converter == null) {
newDims[i] = dims[i];
continue; continue;
} }
newDims[i] = new int[dims[i].length]; newDims[i] = new int[dims[i].length];
for (int j = 0; j < dims[i].length; ++j) { for (int j = 0; j < dims[i].length; ++j) {
if (!converter.hasRemaining()) {
log.error("Converter mismatch! wtfbbq!");
}
newDims[i][j] = converter.get(dims[i][j]); newDims[i][j] = converter.get(dims[i][j]);
} }
} }
@ -1508,4 +1482,109 @@ public class IndexMerger
} }
IndexIO.checkFileSize(metadataFile); IndexIO.checkFileSize(metadataFile);
} }
static class DictionaryMergeIterator implements Iterator<String>
{
protected final IntBuffer[] conversions;
protected final PriorityQueue<Pair<Integer, PeekingIterator<String>>> pQueue;
protected int counter;
DictionaryMergeIterator(Indexed<String>[] dimValueLookups, boolean useDirect)
{
pQueue = new PriorityQueue<>(
dimValueLookups.length,
new Comparator<Pair<Integer, PeekingIterator<String>>>()
{
@Override
public int compare(Pair<Integer, PeekingIterator<String>> lhs, Pair<Integer, PeekingIterator<String>> rhs)
{
return lhs.rhs.peek().compareTo(rhs.rhs.peek());
}
}
);
conversions = new IntBuffer[dimValueLookups.length];
for (int i = 0; i < conversions.length; i++) {
if (dimValueLookups[i] == null) {
continue;
}
Indexed<String> indexed = dimValueLookups[i];
if (useDirect) {
conversions[i] = ByteBuffer.allocateDirect(indexed.size() * Ints.BYTES).asIntBuffer();
} else {
conversions[i] = IntBuffer.allocate(indexed.size());
}
final PeekingIterator<String> iter = Iterators.peekingIterator(
Iterators.transform(
indexed.iterator(),
new Function<String, String>()
{
@Override
public String apply(@Nullable String input)
{
return Strings.nullToEmpty(input);
}
}
)
);
if (iter.hasNext()) {
pQueue.add(Pair.of(i, iter));
}
}
}
@Override
public boolean hasNext()
{
return !pQueue.isEmpty();
}
@Override
public String next()
{
Pair<Integer, PeekingIterator<String>> smallest = pQueue.remove();
if (smallest == null) {
throw new NoSuchElementException();
}
final String value = writeTranslate(smallest, counter);
while (!pQueue.isEmpty() && value.equals(pQueue.peek().rhs.peek())) {
writeTranslate(pQueue.remove(), counter);
}
counter++;
return value;
}
boolean needConversion(int index)
{
IntBuffer readOnly = conversions[index].asReadOnlyBuffer();
readOnly.rewind();
for (int i = 0; readOnly.hasRemaining(); i++) {
if (i != readOnly.get()) {
return true;
}
}
return false;
}
private String writeTranslate(Pair<Integer, PeekingIterator<String>> smallest, int counter)
{
final int index = smallest.lhs;
final String value = smallest.rhs.next();
conversions[index].put(counter);
if (smallest.rhs.hasNext()) {
pQueue.add(smallest);
}
return value;
}
@Override
public void remove()
{
throw new UnsupportedOperationException("remove");
}
}
} }

View File

@ -73,7 +73,6 @@ import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -120,7 +119,6 @@ public class IndexMergerV9 extends IndexMerger
adapters, adapters,
new Function<IndexableAdapter, Metadata>() new Function<IndexableAdapter, Metadata>()
{ {
@Nullable
@Override @Override
public Metadata apply(IndexableAdapter input) public Metadata apply(IndexableAdapter input)
{ {
@ -187,7 +185,6 @@ public class IndexMergerV9 extends IndexMerger
mergedDimensions, mergedDimensions,
mergedMetrics, mergedMetrics,
dimConversions, dimConversions,
dimCardinalities,
convertMissingDimsFlags, convertMissingDimsFlags,
rowMergerFn rowMergerFn
); );
@ -526,15 +523,7 @@ public class IndexMergerV9 extends IndexMerger
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
} }
DictIdSeeker[] dictIdSeeker = new DictIdSeeker[adapters.size()]; IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimension);
for (int j = 0; j < adapters.size(); j++) {
IntBuffer dimConversion = dimConversions.get(j).get(dimension);
if (dimConversion != null) {
dictIdSeeker[j] = new DictIdSeeker((IntBuffer)dimConversion.asReadOnlyBuffer().rewind());
} else {
dictIdSeeker[j] = new DictIdSeeker(null);
}
}
ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap( ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(
nullRowsList.get(dimIndex) nullRowsList.get(dimIndex)
@ -546,7 +535,7 @@ public class IndexMergerV9 extends IndexMerger
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size()); List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size());
for (int j = 0; j < adapters.size(); ++j) { for (int j = 0; j < adapters.size(); ++j) {
int seekedDictId = dictIdSeeker[j].seek(dictId); int seekedDictId = dictIdSeeker[j].seek(dictId);
if (seekedDictId != DictIdSeeker.NOT_EXIST) { if (seekedDictId != IndexSeeker.NOT_EXIST) {
convertedInverteds.add( convertedInverteds.add(
new ConvertingIndexedInts( new ConvertingIndexedInts(
adapters.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j) adapters.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j)
@ -567,10 +556,9 @@ public class IndexMergerV9 extends IndexMerger
ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset); ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset);
if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) { if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) {
bitmapIndexWriters.get(dimIndex).write(nullRowBitmap.union(bitmapToWrite)); bitmapToWrite = nullRowBitmap.union(bitmapToWrite);
} else {
bitmapIndexWriters.get(dimIndex).write(bitmapToWrite);
} }
bitmapIndexWriters.get(dimIndex).write(bitmapToWrite);
if (spatialIndexWriter != null) { if (spatialIndexWriter != null) {
String dimVal = dimVals.get(dictId); String dimVal = dimVals.get(dictId);
@ -794,78 +782,6 @@ public class IndexMergerV9 extends IndexMerger
return dimWriters; return dimWriters;
} }
private Iterable<Rowboat> makeRowIterable(
final List<IndexableAdapter> adapters,
final List<String> mergedDimensions,
final List<String> mergedMetrics,
final ArrayList<Map<String, IntBuffer>> dimConversions,
final Map<String, Integer> dimCardinalities,
final ArrayList<Boolean> convertMissingDimsFlags,
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
)
{
ArrayList<Iterable<Rowboat>> boats = Lists.newArrayListWithCapacity(adapters.size());
for (int i = 0; i < adapters.size(); ++i) {
final IndexableAdapter adapter = adapters.get(i);
final int[] dimLookup = new int[mergedDimensions.size()];
int count = 0;
for (String dim : adapter.getDimensionNames()) {
dimLookup[count] = mergedDimensions.indexOf(dim);
count++;
}
final int[] metricLookup = new int[mergedMetrics.size()];
count = 0;
for (String metric : adapter.getMetricNames()) {
metricLookup[count] = mergedMetrics.indexOf(metric);
count++;
}
boats.add(
new MMappedIndexRowIterable(
Iterables.transform(
adapters.get(i).getRows(),
new Function<Rowboat, Rowboat>()
{
@Override
public Rowboat apply(Rowboat input)
{
int[][] newDims = new int[mergedDimensions.size()][];
int j = 0;
for (int[] dim : input.getDims()) {
newDims[dimLookup[j]] = dim;
j++;
}
Object[] newMetrics = new Object[mergedMetrics.size()];
j = 0;
for (Object met : input.getMetrics()) {
newMetrics[metricLookup[j]] = met;
j++;
}
return new Rowboat(
input.getTimestamp(),
newDims,
newMetrics,
input.getRowNum()
);
}
}
),
mergedDimensions,
dimConversions.get(i),
i,
convertMissingDimsFlags
)
);
}
return rowMergerFn.apply(boats);
}
private ArrayList<GenericIndexedWriter<String>> setupDimValueWriters( private ArrayList<GenericIndexedWriter<String>> setupDimValueWriters(
final IOPeon ioPeon, final IOPeon ioPeon,
final List<String> mergedDimensions final List<String> mergedDimensions
@ -884,7 +800,7 @@ public class IndexMergerV9 extends IndexMerger
} }
private void writeDimValueAndSetupDimConversion( private void writeDimValueAndSetupDimConversion(
final List<IndexableAdapter> adapters, final List<IndexableAdapter> indexes,
final ProgressIndicator progress, final ProgressIndicator progress,
final List<String> mergedDimensions, final List<String> mergedDimensions,
final Map<String, Integer> dimensionCardinalities, final Map<String, Integer> dimensionCardinalities,
@ -898,34 +814,29 @@ public class IndexMergerV9 extends IndexMerger
final String section = "setup dimension conversions"; final String section = "setup dimension conversions";
progress.startSection(section); progress.startSection(section);
for (int i = 0; i < adapters.size(); ++i) { for (int i = 0; i < indexes.size(); ++i) {
dimConversions.add(Maps.<String, IntBuffer>newHashMap()); dimConversions.add(Maps.<String, IntBuffer>newHashMap());
} }
for (int dimIndex = 0; dimIndex < mergedDimensions.size(); ++dimIndex) { for (int dimIndex = 0; dimIndex < mergedDimensions.size(); ++dimIndex) {
long dimStartTime = System.currentTimeMillis(); long dimStartTime = System.currentTimeMillis();
String dimension = mergedDimensions.get(dimIndex); String dimension = mergedDimensions.get(dimIndex);
boolean dimHasNull = false;
// lookups for all dimension values of this dimension
List<Indexed<String>> dimValueLookups = Lists.newArrayListWithCapacity(adapters.size());
// each converter converts dim values of this dimension to global dictionary
DimValueConverter[] converters = new DimValueConverter[adapters.size()];
boolean dimHasValues = false; boolean dimHasValues = false;
boolean dimAbsentFromSomeIndex = false; boolean dimAbsentFromSomeIndex = false;
boolean dimHasNull = false;
boolean[] dimHasValuesByIndex = new boolean[adapters.size()]; int numMergeIndex = 0;
for (int i = 0; i < adapters.size(); i++) { Indexed<String> dimValueLookup = null;
Indexed<String> dimValues = adapters.get(i).getDimValueLookup(dimension); Indexed<String>[] dimValueLookups = new Indexed[indexes.size() + 1];
for (int i = 0; i < indexes.size(); i++) {
Indexed<String> dimValues = indexes.get(i).getDimValueLookup(dimension);
if (!isNullColumn(dimValues)) { if (!isNullColumn(dimValues)) {
dimHasValues = true; dimHasValues = true;
dimHasValuesByIndex[i] = true; dimHasNull |= dimValues.indexOf(null) >= 0;
dimValueLookups.add(dimValues); dimValueLookups[i] = dimValueLookup = dimValues;
converters[i] = new DimValueConverter(dimValues); numMergeIndex++;
} else { } else {
dimAbsentFromSomeIndex = true; dimAbsentFromSomeIndex = true;
dimHasValuesByIndex[i] = false;
} }
} }
@ -939,58 +850,35 @@ public class IndexMergerV9 extends IndexMerger
* later on, to allow rows from indexes without a particular dimension to merge correctly with * later on, to allow rows from indexes without a particular dimension to merge correctly with
* rows from indexes with null/empty str values for that dimension. * rows from indexes with null/empty str values for that dimension.
*/ */
if (convertMissingDims) { if (convertMissingDims && !dimHasNull) {
dimValueLookups.add(EMPTY_STR_DIM_VAL); dimHasNull = true;
for (int i = 0; i < adapters.size(); i++) { dimValueLookups[indexes.size()] = dimValueLookup = EMPTY_STR_DIM_VAL;
if (!dimHasValuesByIndex[i]) { numMergeIndex++;
converters[i] = new DimValueConverter(EMPTY_STR_DIM_VAL);
}
}
} }
// sort all dimension values and treat all null values as empty strings
Iterable<String> dimensionValues = CombiningIterable.createSplatted(
Iterables.transform(
dimValueLookups,
new Function<Indexed<String>, Iterable<String>>()
{
@Override
public Iterable<String> apply(@Nullable Indexed<String> indexed)
{
return Iterables.transform(
indexed,
new Function<String, String>()
{
@Override
public String apply(@Nullable String input)
{
return (input == null) ? "" : input;
}
}
);
}
}
), Ordering.<String>natural().nullsFirst()
);
GenericIndexedWriter<String> writer = dimValueWriters.get(dimIndex); GenericIndexedWriter<String> writer = dimValueWriters.get(dimIndex);
int cardinality = 0;
for (String value : dimensionValues) {
value = value == null ? "" : value;
writer.write(value);
if (value.length() == 0) { int cardinality = 0;
dimHasNull = true; if (numMergeIndex > 1) {
DictionaryMergeIterator iterator = new DictionaryMergeIterator(dimValueLookups, true);
while (iterator.hasNext()) {
writer.write(iterator.next());
} }
for (int i = 0; i < adapters.size(); i++) { for (int i = 0; i < indexes.size(); i++) {
DimValueConverter converter = converters[i]; if (dimValueLookups[i] != null && iterator.needConversion(i)) {
if (converter != null) { dimConversions.get(i).put(dimension, iterator.conversions[i]);
converter.convert(value, cardinality);
} }
} }
++cardinality; cardinality = iterator.counter;
} else if (numMergeIndex == 1) {
for (String value : dimValueLookup) {
writer.write(value);
}
cardinality = dimValueLookup.size();
} }
// Mark if this dim has the null/empty str value in its dictionary, used for determining nullRowsList later. // Mark if this dim has the null/empty str value in its dictionary, used for determining nullRowsList later.
dimHasNullFlags.add(dimHasNull); dimHasNullFlags.add(dimHasNull);
@ -1009,14 +897,6 @@ public class IndexMergerV9 extends IndexMerger
continue; continue;
} }
dimensionSkipFlag.add(false); dimensionSkipFlag.add(false);
// make the conversion
for (int i = 0; i < adapters.size(); ++i) {
DimValueConverter converter = converters[i];
if (converter != null) {
dimConversions.get(i).put(dimension, converters[i].getConversionBuffer());
}
}
} }
progress.stopSection(section); progress.stopSection(section);
} }

View File

@ -46,9 +46,6 @@ public class SpatialFilter implements Filter
public ImmutableBitmap getBitmapIndex(final BitmapIndexSelector selector) public ImmutableBitmap getBitmapIndex(final BitmapIndexSelector selector)
{ {
Iterable<ImmutableBitmap> search = selector.getSpatialIndex(dimension).search(bound); Iterable<ImmutableBitmap> search = selector.getSpatialIndex(dimension).search(bound);
for (ImmutableBitmap immutableBitmap : search) {
System.out.println(immutableBitmap);
}
return selector.getBitmapFactory().union(search); return selector.getBitmapFactory().union(search);
} }

View File

@ -0,0 +1,64 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.collect.Iterators;
import io.druid.segment.data.ArrayIndexed;
import io.druid.segment.data.Indexed;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class DictionaryMergeIteratorTest
{
@Test
public void basicTest()
{
// a b c d e f
String[] s1 = {"a", "c", "d", "e"}; // 0 2 3 4
String[] s2 = {"b", "c", "e"}; // 1 2 4
String[] s3 = {"a", "d", "f"}; // 0 3 5
String[] s4 = {"a", "b", "c"};
String[] s5 = {"a", "b", "c", "d", "e", "f"};
Indexed<String> i1 = new ArrayIndexed<String>(s1, String.class);
Indexed<String> i2 = new ArrayIndexed<String>(s2, String.class);
Indexed<String> i3 = new ArrayIndexed<String>(s3, String.class);
Indexed<String> i4 = new ArrayIndexed<String>(s4, String.class);
Indexed<String> i5 = new ArrayIndexed<String>(s5, String.class);
IndexMerger.DictionaryMergeIterator iterator = new IndexMerger.DictionaryMergeIterator(new Indexed[]{i1, i2, i3, i4, i5}, false);
Assert.assertArrayEquals(new String[]{"a", "b", "c", "d", "e", "f"}, Iterators.toArray(iterator, String.class));
Assert.assertArrayEquals(new int[] {0, 2, 3, 4}, iterator.conversions[0].array());
Assert.assertArrayEquals(new int[] {1, 2, 4}, iterator.conversions[1].array());
Assert.assertArrayEquals(new int[] {0, 3, 5}, iterator.conversions[2].array());
Assert.assertArrayEquals(new int[] {0, 1, 2}, iterator.conversions[3].array());
Assert.assertArrayEquals(new int[] {0, 1, 2, 3, 4, 5}, iterator.conversions[4].array());
Assert.assertTrue(iterator.needConversion(0));
Assert.assertTrue(iterator.needConversion(1));
Assert.assertTrue(iterator.needConversion(2));
Assert.assertFalse(iterator.needConversion(3));
Assert.assertFalse(iterator.needConversion(4));
}
}

View File

@ -1668,7 +1668,9 @@ public class IndexMergerTest
dimConversions.put(0); dimConversions.put(0);
dimConversions.put(2); dimConversions.put(2);
dimConversions.put(4); dimConversions.put(4);
IndexMerger.DictIdSeeker dictIdSeeker = new IndexMerger.DictIdSeeker((IntBuffer) dimConversions.asReadOnlyBuffer().rewind()); IndexMerger.IndexSeeker dictIdSeeker = new IndexMerger.IndexSeekerWithConversion(
(IntBuffer) dimConversions.asReadOnlyBuffer().rewind()
);
Assert.assertEquals(0, dictIdSeeker.seek(0)); Assert.assertEquals(0, dictIdSeeker.seek(0));
Assert.assertEquals(-1, dictIdSeeker.seek(1)); Assert.assertEquals(-1, dictIdSeeker.seek(1));
Assert.assertEquals(1, dictIdSeeker.seek(2)); Assert.assertEquals(1, dictIdSeeker.seek(2));