mirror of https://github.com/apache/druid.git
Merge pull request #2094 from navis/simplify-index-merge
Simplifying dimension merging
This commit is contained in:
commit
93c50d8538
|
@ -21,9 +21,9 @@ package io.druid.segment;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.FluentIterable;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
@ -31,6 +31,7 @@ import com.google.common.collect.Iterators;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.PeekingIterator;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Files;
|
||||
|
@ -45,6 +46,7 @@ import com.metamx.collections.spatial.RTree;
|
|||
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.guava.MergeIterable;
|
||||
import com.metamx.common.guava.nary.BinaryFn;
|
||||
|
@ -92,9 +94,12 @@ import java.nio.MappedByteBuffer;
|
|||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
|
@ -641,7 +646,7 @@ public class IndexMerger
|
|||
ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(indexes.size());
|
||||
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
|
||||
|
||||
for (IndexableAdapter index : indexes) {
|
||||
for (int i = 0; i < indexes.size(); ++i) {
|
||||
dimConversions.add(Maps.<String, IntBuffer>newHashMap());
|
||||
}
|
||||
|
||||
|
@ -651,22 +656,22 @@ public class IndexMerger
|
|||
);
|
||||
writer.open();
|
||||
|
||||
List<Indexed<String>> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size() + 1);
|
||||
DimValueConverter[] converters = new DimValueConverter[indexes.size()];
|
||||
boolean dimHasNull = false;
|
||||
boolean dimHasValues = false;
|
||||
boolean dimAbsentFromSomeIndex = false;
|
||||
boolean[] dimHasValuesByIndex = new boolean[indexes.size()];
|
||||
|
||||
int numMergeIndex = 0;
|
||||
Indexed<String> dimValueLookup = null;
|
||||
Indexed<String>[] dimValueLookups = new Indexed[indexes.size() + 1];
|
||||
for (int i = 0; i < indexes.size(); i++) {
|
||||
Indexed<String> dimValues = indexes.get(i).getDimValueLookup(dimension);
|
||||
if (!isNullColumn(dimValues)) {
|
||||
dimHasValues = true;
|
||||
dimHasValuesByIndex[i] = true;
|
||||
dimValueLookups.add(dimValues);
|
||||
converters[i] = new DimValueConverter(dimValues);
|
||||
dimHasNull |= dimValues.indexOf(null) >= 0;
|
||||
dimValueLookups[i] = dimValueLookup = dimValues;
|
||||
numMergeIndex++;
|
||||
} else {
|
||||
dimAbsentFromSomeIndex = true;
|
||||
dimHasValuesByIndex[i] = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -680,57 +685,33 @@ public class IndexMerger
|
|||
* later on, to allow rows from indexes without a particular dimension to merge correctly with
|
||||
* rows from indexes with null/empty str values for that dimension.
|
||||
*/
|
||||
if (convertMissingDims) {
|
||||
dimValueLookups.add(EMPTY_STR_DIM_VAL);
|
||||
for (int i = 0; i < indexes.size(); i++) {
|
||||
if (!dimHasValuesByIndex[i]) {
|
||||
converters[i] = new DimValueConverter(EMPTY_STR_DIM_VAL);
|
||||
}
|
||||
}
|
||||
if (convertMissingDims && !dimHasNull) {
|
||||
dimValueLookups[indexes.size()] = dimValueLookup = EMPTY_STR_DIM_VAL;
|
||||
numMergeIndex++;
|
||||
}
|
||||
|
||||
Iterable<String> dimensionValues = CombiningIterable.createSplatted(
|
||||
Iterables.transform(
|
||||
dimValueLookups,
|
||||
new Function<Indexed<String>, Iterable<String>>()
|
||||
{
|
||||
@Override
|
||||
public Iterable<String> apply(@Nullable Indexed<String> indexed)
|
||||
{
|
||||
return Iterables.transform(
|
||||
indexed,
|
||||
new Function<String, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable String input)
|
||||
{
|
||||
return (input == null) ? "" : input;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
,
|
||||
Ordering.<String>natural().nullsFirst()
|
||||
);
|
||||
int cardinality = 0;
|
||||
if (numMergeIndex > 1) {
|
||||
DictionaryMergeIterator iterator = new DictionaryMergeIterator(dimValueLookups, true);
|
||||
|
||||
int count = 0;
|
||||
for (String value : dimensionValues) {
|
||||
value = value == null ? "" : value;
|
||||
writer.write(value);
|
||||
|
||||
for (int i = 0; i < indexes.size(); i++) {
|
||||
DimValueConverter converter = converters[i];
|
||||
if (converter != null) {
|
||||
converter.convert(value, count);
|
||||
}
|
||||
while (iterator.hasNext()) {
|
||||
writer.write(iterator.next());
|
||||
}
|
||||
|
||||
++count;
|
||||
for (int i = 0; i < indexes.size(); i++) {
|
||||
if (dimValueLookups[i] != null && iterator.needConversion(i)) {
|
||||
dimConversions.get(i).put(dimension, iterator.conversions[i]);
|
||||
}
|
||||
}
|
||||
cardinality = iterator.counter;
|
||||
} else if (numMergeIndex == 1) {
|
||||
for (String value : dimValueLookup) {
|
||||
writer.write(value);
|
||||
}
|
||||
cardinality = dimValueLookup.size();
|
||||
}
|
||||
|
||||
dimensionCardinalities.put(dimension, count);
|
||||
dimensionCardinalities.put(dimension, cardinality);
|
||||
|
||||
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true);
|
||||
dimOuts.add(dimOut);
|
||||
|
@ -738,12 +719,6 @@ public class IndexMerger
|
|||
writer.close();
|
||||
serializerUtils.writeString(dimOut, dimension);
|
||||
ByteStreams.copy(writer.combineStreams(), dimOut);
|
||||
for (int i = 0; i < indexes.size(); ++i) {
|
||||
DimValueConverter converter = converters[i];
|
||||
if (converter != null) {
|
||||
dimConversions.get(i).put(dimension, converters[i].getConversionBuffer());
|
||||
}
|
||||
}
|
||||
|
||||
ioPeon.cleanup();
|
||||
}
|
||||
|
@ -753,64 +728,14 @@ public class IndexMerger
|
|||
progress.progress();
|
||||
startTime = System.currentTimeMillis();
|
||||
|
||||
ArrayList<Iterable<Rowboat>> boats = Lists.newArrayListWithCapacity(indexes.size());
|
||||
|
||||
for (int i = 0; i < indexes.size(); ++i) {
|
||||
final IndexableAdapter adapter = indexes.get(i);
|
||||
|
||||
final int[] dimLookup = new int[mergedDimensions.size()];
|
||||
int count = 0;
|
||||
for (String dim : adapter.getDimensionNames()) {
|
||||
dimLookup[count] = mergedDimensions.indexOf(dim);
|
||||
count++;
|
||||
}
|
||||
|
||||
final int[] metricLookup = new int[mergedMetrics.size()];
|
||||
count = 0;
|
||||
for (String metric : adapter.getMetricNames()) {
|
||||
metricLookup[count] = mergedMetrics.indexOf(metric);
|
||||
count++;
|
||||
}
|
||||
|
||||
boats.add(
|
||||
new MMappedIndexRowIterable(
|
||||
Iterables.transform(
|
||||
indexes.get(i).getRows(),
|
||||
new Function<Rowboat, Rowboat>()
|
||||
{
|
||||
@Override
|
||||
public Rowboat apply(@Nullable Rowboat input)
|
||||
{
|
||||
int[][] newDims = new int[mergedDimensions.size()][];
|
||||
int j = 0;
|
||||
for (int[] dim : input.getDims()) {
|
||||
newDims[dimLookup[j]] = dim;
|
||||
j++;
|
||||
}
|
||||
|
||||
Object[] newMetrics = new Object[mergedMetrics.size()];
|
||||
j = 0;
|
||||
for (Object met : input.getMetrics()) {
|
||||
newMetrics[metricLookup[j]] = met;
|
||||
j++;
|
||||
}
|
||||
|
||||
return new Rowboat(
|
||||
input.getTimestamp(),
|
||||
newDims,
|
||||
newMetrics,
|
||||
input.getRowNum()
|
||||
);
|
||||
}
|
||||
}
|
||||
),
|
||||
mergedDimensions, dimConversions.get(i), i,
|
||||
convertMissingDimsFlags
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
Iterable<Rowboat> theRows = rowMergerFn.apply(boats);
|
||||
Iterable<Rowboat> theRows = makeRowIterable(
|
||||
indexes,
|
||||
mergedDimensions,
|
||||
mergedMetrics,
|
||||
dimConversions,
|
||||
convertMissingDimsFlags,
|
||||
rowMergerFn
|
||||
);
|
||||
|
||||
CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create(
|
||||
ioPeon, "little_end_time", IndexIO.BYTE_ORDER, CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
|
||||
|
@ -969,22 +894,15 @@ public class IndexMerger
|
|||
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
|
||||
}
|
||||
|
||||
DictIdSeeker[] dictIdSeeker = new DictIdSeeker[indexes.size()];
|
||||
for (int j = 0; j < indexes.size(); j++) {
|
||||
IntBuffer dimConversion = dimConversions.get(j).get(dimension);
|
||||
if (dimConversion != null) {
|
||||
dictIdSeeker[j] = new DictIdSeeker((IntBuffer) dimConversion.asReadOnlyBuffer().rewind());
|
||||
} else {
|
||||
dictIdSeeker[j] = new DictIdSeeker(null);
|
||||
}
|
||||
}
|
||||
IndexSeeker[] dictIdSeeker = toIndexSeekers(indexes, dimConversions, dimension);
|
||||
|
||||
//Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result.
|
||||
for (int dictId = 0; dictId < dimVals.size(); dictId++) {
|
||||
progress.progress();
|
||||
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size());
|
||||
for (int j = 0; j < indexes.size(); ++j) {
|
||||
int seekedDictId = dictIdSeeker[j].seek(dictId);
|
||||
if (seekedDictId != DictIdSeeker.NOT_EXIST) {
|
||||
if (seekedDictId != IndexSeeker.NOT_EXIST) {
|
||||
convertedInverteds.add(
|
||||
new ConvertingIndexedInts(
|
||||
indexes.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j)
|
||||
|
@ -1094,6 +1012,98 @@ public class IndexMerger
|
|||
return outDir;
|
||||
}
|
||||
|
||||
protected Iterable<Rowboat> makeRowIterable(
|
||||
List<IndexableAdapter> indexes,
|
||||
final List<String> mergedDimensions,
|
||||
final List<String> mergedMetrics,
|
||||
ArrayList<Map<String, IntBuffer>> dimConversions,
|
||||
ArrayList<Boolean> convertMissingDimsFlags,
|
||||
Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
|
||||
)
|
||||
{
|
||||
ArrayList<Iterable<Rowboat>> boats = Lists.newArrayListWithCapacity(indexes.size());
|
||||
|
||||
for (int i = 0; i < indexes.size(); ++i) {
|
||||
final IndexableAdapter adapter = indexes.get(i);
|
||||
|
||||
final int[] dimLookup = toLookupMap(adapter.getDimensionNames(), mergedDimensions);
|
||||
final int[] metricLookup = toLookupMap(adapter.getMetricNames(), mergedMetrics);
|
||||
|
||||
Iterable<Rowboat> target = indexes.get(i).getRows();
|
||||
if (dimLookup != null || metricLookup != null) {
|
||||
// resize/reorder index table if needed
|
||||
target = Iterables.transform(
|
||||
target,
|
||||
new Function<Rowboat, Rowboat>()
|
||||
{
|
||||
@Override
|
||||
public Rowboat apply(Rowboat input)
|
||||
{
|
||||
int[][] newDims = input.getDims();
|
||||
if (dimLookup != null) {
|
||||
newDims = new int[mergedDimensions.size()][];
|
||||
int j = 0;
|
||||
for (int[] dim : input.getDims()) {
|
||||
newDims[dimLookup[j]] = dim;
|
||||
j++;
|
||||
}
|
||||
}
|
||||
|
||||
Object[] newMetrics = input.getMetrics();
|
||||
if (metricLookup != null) {
|
||||
newMetrics = new Object[mergedMetrics.size()];
|
||||
int j = 0;
|
||||
for (Object met : input.getMetrics()) {
|
||||
newMetrics[metricLookup[j]] = met;
|
||||
j++;
|
||||
}
|
||||
}
|
||||
|
||||
return new Rowboat(
|
||||
input.getTimestamp(),
|
||||
newDims,
|
||||
newMetrics,
|
||||
input.getRowNum()
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
boats.add(
|
||||
new MMappedIndexRowIterable(
|
||||
target, mergedDimensions, dimConversions.get(i), i, convertMissingDimsFlags
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return rowMergerFn.apply(boats);
|
||||
}
|
||||
|
||||
private int[] toLookupMap(Indexed<String> indexed, List<String> values)
|
||||
{
|
||||
if (isSame(indexed, values)) {
|
||||
return null; // no need to convert
|
||||
}
|
||||
int[] dimLookup = new int[values.size()];
|
||||
for (int i = 0; i < indexed.size(); i++) {
|
||||
dimLookup[i] = values.indexOf(indexed.get(i));
|
||||
}
|
||||
return dimLookup;
|
||||
}
|
||||
|
||||
private boolean isSame(Indexed<String> indexed, List<String> values)
|
||||
{
|
||||
if (indexed.size() != values.size()) {
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < indexed.size(); i++) {
|
||||
if (!indexed.get(i).equals(values.get(i))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public static <T extends Comparable> ArrayList<T> mergeIndexed(final List<Iterable<T>> indexedLists)
|
||||
{
|
||||
Set<T> retVal = Sets.newTreeSet(Ordering.<T>natural().nullsFirst());
|
||||
|
@ -1133,92 +1143,60 @@ public class IndexMerger
|
|||
IndexIO.checkFileSize(indexFile);
|
||||
}
|
||||
|
||||
public static class DimValueConverter
|
||||
protected IndexSeeker[] toIndexSeekers(
|
||||
List<IndexableAdapter> adapters,
|
||||
ArrayList<Map<String, IntBuffer>> dimConversions,
|
||||
String dimension
|
||||
)
|
||||
{
|
||||
private final Indexed<String> dimSet;
|
||||
private final IntBuffer conversionBuf;
|
||||
|
||||
private int currIndex;
|
||||
private String lastVal = null;
|
||||
private String currValue;
|
||||
|
||||
DimValueConverter(
|
||||
Indexed<String> dimSet
|
||||
)
|
||||
{
|
||||
this.dimSet = dimSet;
|
||||
conversionBuf = ByteBuffer.allocateDirect(dimSet.size() * Ints.BYTES).asIntBuffer();
|
||||
|
||||
currIndex = 0;
|
||||
currValue = null;
|
||||
}
|
||||
|
||||
public void convert(String value, int index)
|
||||
{
|
||||
if (dimSet.size() == 0) {
|
||||
return;
|
||||
}
|
||||
if (lastVal != null) {
|
||||
if (value.compareTo(lastVal) <= 0) {
|
||||
throw new ISE("Value[%s] is less than the last value[%s] I have, cannot be.", value, lastVal);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (currValue == null) {
|
||||
currValue = dimSet.get(currIndex);
|
||||
}
|
||||
|
||||
while (currValue == null) {
|
||||
conversionBuf.position(conversionBuf.position() + 1);
|
||||
++currIndex;
|
||||
if (currIndex == dimSet.size()) {
|
||||
lastVal = value;
|
||||
return;
|
||||
}
|
||||
currValue = dimSet.get(currIndex);
|
||||
}
|
||||
|
||||
if (Objects.equal(currValue, value)) {
|
||||
conversionBuf.put(index);
|
||||
++currIndex;
|
||||
if (currIndex == dimSet.size()) {
|
||||
lastVal = value;
|
||||
} else {
|
||||
currValue = dimSet.get(currIndex);
|
||||
}
|
||||
} else if (currValue.compareTo(value) < 0) {
|
||||
throw new ISE(
|
||||
"Skipped currValue[%s], currIndex[%,d]; incoming value[%s], index[%,d]", currValue, currIndex, value, index
|
||||
);
|
||||
IndexSeeker[] seekers = new IndexSeeker[adapters.size()];
|
||||
for (int i = 0; i < adapters.size(); i++) {
|
||||
IntBuffer dimConversion = dimConversions.get(i).get(dimension);
|
||||
if (dimConversion != null) {
|
||||
seekers[i] = new IndexSeekerWithConversion((IntBuffer) dimConversion.asReadOnlyBuffer().rewind());
|
||||
} else {
|
||||
Indexed<String> dimValueLookup = adapters.get(i).getDimValueLookup(dimension);
|
||||
seekers[i] = new IndexSeekerWithoutConversion(dimValueLookup == null ? 0 : dimValueLookup.size());
|
||||
}
|
||||
}
|
||||
return seekers;
|
||||
}
|
||||
|
||||
public IntBuffer getConversionBuffer()
|
||||
static interface IndexSeeker
|
||||
{
|
||||
int NOT_EXIST = -1;
|
||||
int NOT_INIT = -1;
|
||||
|
||||
int seek(int dictId);
|
||||
}
|
||||
|
||||
static class IndexSeekerWithoutConversion implements IndexSeeker
|
||||
{
|
||||
private final int limit;
|
||||
|
||||
public IndexSeekerWithoutConversion(int limit)
|
||||
{
|
||||
if (currIndex != conversionBuf.limit() || conversionBuf.hasRemaining()) {
|
||||
throw new ISE(
|
||||
"Asked for incomplete buffer. currIndex[%,d] != buf.limit[%,d]", currIndex, conversionBuf.limit()
|
||||
);
|
||||
}
|
||||
return (IntBuffer) conversionBuf.asReadOnlyBuffer().rewind();
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int seek(int dictId)
|
||||
{
|
||||
return dictId < limit ? dictId : NOT_EXIST;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get old dictId from new dictId, and only support access in order
|
||||
*/
|
||||
public static class DictIdSeeker
|
||||
static class IndexSeekerWithConversion implements IndexSeeker
|
||||
{
|
||||
static final int NOT_EXIST = -1;
|
||||
static final int NOT_INIT = -1;
|
||||
private final IntBuffer dimConversions;
|
||||
private int currIndex;
|
||||
private int currVal;
|
||||
private int lastVal;
|
||||
|
||||
DictIdSeeker(
|
||||
IntBuffer dimConversions
|
||||
)
|
||||
IndexSeekerWithConversion(IntBuffer dimConversions)
|
||||
{
|
||||
this.dimConversions = dimConversions;
|
||||
this.currIndex = 0;
|
||||
|
@ -1233,8 +1211,9 @@ public class IndexMerger
|
|||
}
|
||||
if (lastVal != NOT_INIT) {
|
||||
if (dictId <= lastVal) {
|
||||
throw new ISE("Value dictId[%d] is less than the last value dictId[%d] I have, cannot be.",
|
||||
dictId, lastVal
|
||||
throw new ISE(
|
||||
"Value dictId[%d] is less than the last value dictId[%d] I have, cannot be.",
|
||||
dictId, lastVal
|
||||
);
|
||||
}
|
||||
return NOT_EXIST;
|
||||
|
@ -1252,8 +1231,9 @@ public class IndexMerger
|
|||
}
|
||||
return ret;
|
||||
} else if (currVal < dictId) {
|
||||
throw new ISE("Skipped currValue dictId[%d], currIndex[%d]; incoming value dictId[%d]",
|
||||
currVal, currIndex, dictId
|
||||
throw new ISE(
|
||||
"Skipped currValue dictId[%d], currIndex[%d]; incoming value dictId[%d]",
|
||||
currVal, currIndex, dictId
|
||||
);
|
||||
} else {
|
||||
return NOT_EXIST;
|
||||
|
@ -1357,29 +1337,23 @@ public class IndexMerger
|
|||
int[][] newDims = new int[convertedDims.size()][];
|
||||
for (int i = 0; i < convertedDims.size(); ++i) {
|
||||
IntBuffer converter = converterArray[i];
|
||||
String dimName = convertedDims.get(i);
|
||||
|
||||
if (converter == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (i >= dims.length) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (dims[i] == null) {
|
||||
if (convertMissingDimsFlags.get(i)) {
|
||||
newDims[i] = EMPTY_STR_DIM;
|
||||
}
|
||||
if (dims[i] == null && convertMissingDimsFlags.get(i)) {
|
||||
newDims[i] = EMPTY_STR_DIM;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (converter == null) {
|
||||
newDims[i] = dims[i];
|
||||
continue;
|
||||
}
|
||||
|
||||
newDims[i] = new int[dims[i].length];
|
||||
|
||||
for (int j = 0; j < dims[i].length; ++j) {
|
||||
if (!converter.hasRemaining()) {
|
||||
log.error("Converter mismatch! wtfbbq!");
|
||||
}
|
||||
newDims[i][j] = converter.get(dims[i][j]);
|
||||
}
|
||||
}
|
||||
|
@ -1508,4 +1482,109 @@ public class IndexMerger
|
|||
}
|
||||
IndexIO.checkFileSize(metadataFile);
|
||||
}
|
||||
|
||||
static class DictionaryMergeIterator implements Iterator<String>
|
||||
{
|
||||
protected final IntBuffer[] conversions;
|
||||
protected final PriorityQueue<Pair<Integer, PeekingIterator<String>>> pQueue;
|
||||
|
||||
protected int counter;
|
||||
|
||||
DictionaryMergeIterator(Indexed<String>[] dimValueLookups, boolean useDirect)
|
||||
{
|
||||
pQueue = new PriorityQueue<>(
|
||||
dimValueLookups.length,
|
||||
new Comparator<Pair<Integer, PeekingIterator<String>>>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Pair<Integer, PeekingIterator<String>> lhs, Pair<Integer, PeekingIterator<String>> rhs)
|
||||
{
|
||||
return lhs.rhs.peek().compareTo(rhs.rhs.peek());
|
||||
}
|
||||
}
|
||||
);
|
||||
conversions = new IntBuffer[dimValueLookups.length];
|
||||
for (int i = 0; i < conversions.length; i++) {
|
||||
if (dimValueLookups[i] == null) {
|
||||
continue;
|
||||
}
|
||||
Indexed<String> indexed = dimValueLookups[i];
|
||||
if (useDirect) {
|
||||
conversions[i] = ByteBuffer.allocateDirect(indexed.size() * Ints.BYTES).asIntBuffer();
|
||||
} else {
|
||||
conversions[i] = IntBuffer.allocate(indexed.size());
|
||||
}
|
||||
|
||||
final PeekingIterator<String> iter = Iterators.peekingIterator(
|
||||
Iterators.transform(
|
||||
indexed.iterator(),
|
||||
new Function<String, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable String input)
|
||||
{
|
||||
return Strings.nullToEmpty(input);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
if (iter.hasNext()) {
|
||||
pQueue.add(Pair.of(i, iter));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return !pQueue.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String next()
|
||||
{
|
||||
Pair<Integer, PeekingIterator<String>> smallest = pQueue.remove();
|
||||
if (smallest == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
final String value = writeTranslate(smallest, counter);
|
||||
|
||||
while (!pQueue.isEmpty() && value.equals(pQueue.peek().rhs.peek())) {
|
||||
writeTranslate(pQueue.remove(), counter);
|
||||
}
|
||||
counter++;
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
boolean needConversion(int index)
|
||||
{
|
||||
IntBuffer readOnly = conversions[index].asReadOnlyBuffer();
|
||||
readOnly.rewind();
|
||||
for (int i = 0; readOnly.hasRemaining(); i++) {
|
||||
if (i != readOnly.get()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private String writeTranslate(Pair<Integer, PeekingIterator<String>> smallest, int counter)
|
||||
{
|
||||
final int index = smallest.lhs;
|
||||
final String value = smallest.rhs.next();
|
||||
|
||||
conversions[index].put(counter);
|
||||
if (smallest.rhs.hasNext()) {
|
||||
pQueue.add(smallest);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove()
|
||||
{
|
||||
throw new UnsupportedOperationException("remove");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,7 +73,6 @@ import org.apache.commons.io.FileUtils;
|
|||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
|
@ -120,7 +119,6 @@ public class IndexMergerV9 extends IndexMerger
|
|||
adapters,
|
||||
new Function<IndexableAdapter, Metadata>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Metadata apply(IndexableAdapter input)
|
||||
{
|
||||
|
@ -187,7 +185,6 @@ public class IndexMergerV9 extends IndexMerger
|
|||
mergedDimensions,
|
||||
mergedMetrics,
|
||||
dimConversions,
|
||||
dimCardinalities,
|
||||
convertMissingDimsFlags,
|
||||
rowMergerFn
|
||||
);
|
||||
|
@ -526,15 +523,7 @@ public class IndexMergerV9 extends IndexMerger
|
|||
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
|
||||
}
|
||||
|
||||
DictIdSeeker[] dictIdSeeker = new DictIdSeeker[adapters.size()];
|
||||
for (int j = 0; j < adapters.size(); j++) {
|
||||
IntBuffer dimConversion = dimConversions.get(j).get(dimension);
|
||||
if (dimConversion != null) {
|
||||
dictIdSeeker[j] = new DictIdSeeker((IntBuffer)dimConversion.asReadOnlyBuffer().rewind());
|
||||
} else {
|
||||
dictIdSeeker[j] = new DictIdSeeker(null);
|
||||
}
|
||||
}
|
||||
IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimension);
|
||||
|
||||
ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(
|
||||
nullRowsList.get(dimIndex)
|
||||
|
@ -546,7 +535,7 @@ public class IndexMergerV9 extends IndexMerger
|
|||
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size());
|
||||
for (int j = 0; j < adapters.size(); ++j) {
|
||||
int seekedDictId = dictIdSeeker[j].seek(dictId);
|
||||
if (seekedDictId != DictIdSeeker.NOT_EXIST) {
|
||||
if (seekedDictId != IndexSeeker.NOT_EXIST) {
|
||||
convertedInverteds.add(
|
||||
new ConvertingIndexedInts(
|
||||
adapters.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j)
|
||||
|
@ -567,10 +556,9 @@ public class IndexMergerV9 extends IndexMerger
|
|||
|
||||
ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset);
|
||||
if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) {
|
||||
bitmapIndexWriters.get(dimIndex).write(nullRowBitmap.union(bitmapToWrite));
|
||||
} else {
|
||||
bitmapIndexWriters.get(dimIndex).write(bitmapToWrite);
|
||||
bitmapToWrite = nullRowBitmap.union(bitmapToWrite);
|
||||
}
|
||||
bitmapIndexWriters.get(dimIndex).write(bitmapToWrite);
|
||||
|
||||
if (spatialIndexWriter != null) {
|
||||
String dimVal = dimVals.get(dictId);
|
||||
|
@ -794,78 +782,6 @@ public class IndexMergerV9 extends IndexMerger
|
|||
return dimWriters;
|
||||
}
|
||||
|
||||
private Iterable<Rowboat> makeRowIterable(
|
||||
final List<IndexableAdapter> adapters,
|
||||
final List<String> mergedDimensions,
|
||||
final List<String> mergedMetrics,
|
||||
final ArrayList<Map<String, IntBuffer>> dimConversions,
|
||||
final Map<String, Integer> dimCardinalities,
|
||||
final ArrayList<Boolean> convertMissingDimsFlags,
|
||||
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
|
||||
)
|
||||
{
|
||||
ArrayList<Iterable<Rowboat>> boats = Lists.newArrayListWithCapacity(adapters.size());
|
||||
|
||||
for (int i = 0; i < adapters.size(); ++i) {
|
||||
final IndexableAdapter adapter = adapters.get(i);
|
||||
|
||||
final int[] dimLookup = new int[mergedDimensions.size()];
|
||||
int count = 0;
|
||||
for (String dim : adapter.getDimensionNames()) {
|
||||
dimLookup[count] = mergedDimensions.indexOf(dim);
|
||||
count++;
|
||||
}
|
||||
|
||||
final int[] metricLookup = new int[mergedMetrics.size()];
|
||||
count = 0;
|
||||
for (String metric : adapter.getMetricNames()) {
|
||||
metricLookup[count] = mergedMetrics.indexOf(metric);
|
||||
count++;
|
||||
}
|
||||
|
||||
boats.add(
|
||||
new MMappedIndexRowIterable(
|
||||
Iterables.transform(
|
||||
adapters.get(i).getRows(),
|
||||
new Function<Rowboat, Rowboat>()
|
||||
{
|
||||
@Override
|
||||
public Rowboat apply(Rowboat input)
|
||||
{
|
||||
int[][] newDims = new int[mergedDimensions.size()][];
|
||||
int j = 0;
|
||||
for (int[] dim : input.getDims()) {
|
||||
newDims[dimLookup[j]] = dim;
|
||||
j++;
|
||||
}
|
||||
|
||||
Object[] newMetrics = new Object[mergedMetrics.size()];
|
||||
j = 0;
|
||||
for (Object met : input.getMetrics()) {
|
||||
newMetrics[metricLookup[j]] = met;
|
||||
j++;
|
||||
}
|
||||
|
||||
return new Rowboat(
|
||||
input.getTimestamp(),
|
||||
newDims,
|
||||
newMetrics,
|
||||
input.getRowNum()
|
||||
);
|
||||
}
|
||||
}
|
||||
),
|
||||
mergedDimensions,
|
||||
dimConversions.get(i),
|
||||
i,
|
||||
convertMissingDimsFlags
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return rowMergerFn.apply(boats);
|
||||
}
|
||||
|
||||
private ArrayList<GenericIndexedWriter<String>> setupDimValueWriters(
|
||||
final IOPeon ioPeon,
|
||||
final List<String> mergedDimensions
|
||||
|
@ -884,7 +800,7 @@ public class IndexMergerV9 extends IndexMerger
|
|||
}
|
||||
|
||||
private void writeDimValueAndSetupDimConversion(
|
||||
final List<IndexableAdapter> adapters,
|
||||
final List<IndexableAdapter> indexes,
|
||||
final ProgressIndicator progress,
|
||||
final List<String> mergedDimensions,
|
||||
final Map<String, Integer> dimensionCardinalities,
|
||||
|
@ -898,34 +814,29 @@ public class IndexMergerV9 extends IndexMerger
|
|||
final String section = "setup dimension conversions";
|
||||
progress.startSection(section);
|
||||
|
||||
for (int i = 0; i < adapters.size(); ++i) {
|
||||
for (int i = 0; i < indexes.size(); ++i) {
|
||||
dimConversions.add(Maps.<String, IntBuffer>newHashMap());
|
||||
}
|
||||
|
||||
for (int dimIndex = 0; dimIndex < mergedDimensions.size(); ++dimIndex) {
|
||||
long dimStartTime = System.currentTimeMillis();
|
||||
String dimension = mergedDimensions.get(dimIndex);
|
||||
|
||||
// lookups for all dimension values of this dimension
|
||||
List<Indexed<String>> dimValueLookups = Lists.newArrayListWithCapacity(adapters.size());
|
||||
|
||||
// each converter converts dim values of this dimension to global dictionary
|
||||
DimValueConverter[] converters = new DimValueConverter[adapters.size()];
|
||||
|
||||
boolean dimHasNull = false;
|
||||
boolean dimHasValues = false;
|
||||
boolean dimAbsentFromSomeIndex = false;
|
||||
boolean dimHasNull = false;
|
||||
boolean[] dimHasValuesByIndex = new boolean[adapters.size()];
|
||||
for (int i = 0; i < adapters.size(); i++) {
|
||||
Indexed<String> dimValues = adapters.get(i).getDimValueLookup(dimension);
|
||||
|
||||
int numMergeIndex = 0;
|
||||
Indexed<String> dimValueLookup = null;
|
||||
Indexed<String>[] dimValueLookups = new Indexed[indexes.size() + 1];
|
||||
for (int i = 0; i < indexes.size(); i++) {
|
||||
Indexed<String> dimValues = indexes.get(i).getDimValueLookup(dimension);
|
||||
if (!isNullColumn(dimValues)) {
|
||||
dimHasValues = true;
|
||||
dimHasValuesByIndex[i] = true;
|
||||
dimValueLookups.add(dimValues);
|
||||
converters[i] = new DimValueConverter(dimValues);
|
||||
dimHasNull |= dimValues.indexOf(null) >= 0;
|
||||
dimValueLookups[i] = dimValueLookup = dimValues;
|
||||
numMergeIndex++;
|
||||
} else {
|
||||
dimAbsentFromSomeIndex = true;
|
||||
dimHasValuesByIndex[i] = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -939,58 +850,35 @@ public class IndexMergerV9 extends IndexMerger
|
|||
* later on, to allow rows from indexes without a particular dimension to merge correctly with
|
||||
* rows from indexes with null/empty str values for that dimension.
|
||||
*/
|
||||
if (convertMissingDims) {
|
||||
dimValueLookups.add(EMPTY_STR_DIM_VAL);
|
||||
for (int i = 0; i < adapters.size(); i++) {
|
||||
if (!dimHasValuesByIndex[i]) {
|
||||
converters[i] = new DimValueConverter(EMPTY_STR_DIM_VAL);
|
||||
}
|
||||
}
|
||||
if (convertMissingDims && !dimHasNull) {
|
||||
dimHasNull = true;
|
||||
dimValueLookups[indexes.size()] = dimValueLookup = EMPTY_STR_DIM_VAL;
|
||||
numMergeIndex++;
|
||||
}
|
||||
|
||||
// sort all dimension values and treat all null values as empty strings
|
||||
Iterable<String> dimensionValues = CombiningIterable.createSplatted(
|
||||
Iterables.transform(
|
||||
dimValueLookups,
|
||||
new Function<Indexed<String>, Iterable<String>>()
|
||||
{
|
||||
@Override
|
||||
public Iterable<String> apply(@Nullable Indexed<String> indexed)
|
||||
{
|
||||
return Iterables.transform(
|
||||
indexed,
|
||||
new Function<String, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable String input)
|
||||
{
|
||||
return (input == null) ? "" : input;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
), Ordering.<String>natural().nullsFirst()
|
||||
);
|
||||
|
||||
GenericIndexedWriter<String> writer = dimValueWriters.get(dimIndex);
|
||||
int cardinality = 0;
|
||||
for (String value : dimensionValues) {
|
||||
value = value == null ? "" : value;
|
||||
writer.write(value);
|
||||
|
||||
if (value.length() == 0) {
|
||||
dimHasNull = true;
|
||||
int cardinality = 0;
|
||||
if (numMergeIndex > 1) {
|
||||
DictionaryMergeIterator iterator = new DictionaryMergeIterator(dimValueLookups, true);
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
writer.write(iterator.next());
|
||||
}
|
||||
|
||||
for (int i = 0; i < adapters.size(); i++) {
|
||||
DimValueConverter converter = converters[i];
|
||||
if (converter != null) {
|
||||
converter.convert(value, cardinality);
|
||||
for (int i = 0; i < indexes.size(); i++) {
|
||||
if (dimValueLookups[i] != null && iterator.needConversion(i)) {
|
||||
dimConversions.get(i).put(dimension, iterator.conversions[i]);
|
||||
}
|
||||
}
|
||||
++cardinality;
|
||||
cardinality = iterator.counter;
|
||||
} else if (numMergeIndex == 1) {
|
||||
for (String value : dimValueLookup) {
|
||||
writer.write(value);
|
||||
}
|
||||
cardinality = dimValueLookup.size();
|
||||
}
|
||||
|
||||
// Mark if this dim has the null/empty str value in its dictionary, used for determining nullRowsList later.
|
||||
dimHasNullFlags.add(dimHasNull);
|
||||
|
||||
|
@ -1009,14 +897,6 @@ public class IndexMergerV9 extends IndexMerger
|
|||
continue;
|
||||
}
|
||||
dimensionSkipFlag.add(false);
|
||||
|
||||
// make the conversion
|
||||
for (int i = 0; i < adapters.size(); ++i) {
|
||||
DimValueConverter converter = converters[i];
|
||||
if (converter != null) {
|
||||
dimConversions.get(i).put(dimension, converters[i].getConversionBuffer());
|
||||
}
|
||||
}
|
||||
}
|
||||
progress.stopSection(section);
|
||||
}
|
||||
|
|
|
@ -46,9 +46,6 @@ public class SpatialFilter implements Filter
|
|||
public ImmutableBitmap getBitmapIndex(final BitmapIndexSelector selector)
|
||||
{
|
||||
Iterable<ImmutableBitmap> search = selector.getSpatialIndex(dimension).search(bound);
|
||||
for (ImmutableBitmap immutableBitmap : search) {
|
||||
System.out.println(immutableBitmap);
|
||||
}
|
||||
return selector.getBitmapFactory().union(search);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment;
|
||||
|
||||
import com.google.common.collect.Iterators;
|
||||
import io.druid.segment.data.ArrayIndexed;
|
||||
import io.druid.segment.data.Indexed;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DictionaryMergeIteratorTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void basicTest()
|
||||
{
|
||||
// a b c d e f
|
||||
String[] s1 = {"a", "c", "d", "e"}; // 0 2 3 4
|
||||
String[] s2 = {"b", "c", "e"}; // 1 2 4
|
||||
String[] s3 = {"a", "d", "f"}; // 0 3 5
|
||||
String[] s4 = {"a", "b", "c"};
|
||||
String[] s5 = {"a", "b", "c", "d", "e", "f"};
|
||||
Indexed<String> i1 = new ArrayIndexed<String>(s1, String.class);
|
||||
Indexed<String> i2 = new ArrayIndexed<String>(s2, String.class);
|
||||
Indexed<String> i3 = new ArrayIndexed<String>(s3, String.class);
|
||||
Indexed<String> i4 = new ArrayIndexed<String>(s4, String.class);
|
||||
Indexed<String> i5 = new ArrayIndexed<String>(s5, String.class);
|
||||
|
||||
IndexMerger.DictionaryMergeIterator iterator = new IndexMerger.DictionaryMergeIterator(new Indexed[]{i1, i2, i3, i4, i5}, false);
|
||||
|
||||
Assert.assertArrayEquals(new String[]{"a", "b", "c", "d", "e", "f"}, Iterators.toArray(iterator, String.class));
|
||||
|
||||
Assert.assertArrayEquals(new int[] {0, 2, 3, 4}, iterator.conversions[0].array());
|
||||
Assert.assertArrayEquals(new int[] {1, 2, 4}, iterator.conversions[1].array());
|
||||
Assert.assertArrayEquals(new int[] {0, 3, 5}, iterator.conversions[2].array());
|
||||
Assert.assertArrayEquals(new int[] {0, 1, 2}, iterator.conversions[3].array());
|
||||
Assert.assertArrayEquals(new int[] {0, 1, 2, 3, 4, 5}, iterator.conversions[4].array());
|
||||
|
||||
Assert.assertTrue(iterator.needConversion(0));
|
||||
Assert.assertTrue(iterator.needConversion(1));
|
||||
Assert.assertTrue(iterator.needConversion(2));
|
||||
Assert.assertFalse(iterator.needConversion(3));
|
||||
Assert.assertFalse(iterator.needConversion(4));
|
||||
}
|
||||
}
|
|
@ -1668,7 +1668,9 @@ public class IndexMergerTest
|
|||
dimConversions.put(0);
|
||||
dimConversions.put(2);
|
||||
dimConversions.put(4);
|
||||
IndexMerger.DictIdSeeker dictIdSeeker = new IndexMerger.DictIdSeeker((IntBuffer) dimConversions.asReadOnlyBuffer().rewind());
|
||||
IndexMerger.IndexSeeker dictIdSeeker = new IndexMerger.IndexSeekerWithConversion(
|
||||
(IntBuffer) dimConversions.asReadOnlyBuffer().rewind()
|
||||
);
|
||||
Assert.assertEquals(0, dictIdSeeker.seek(0));
|
||||
Assert.assertEquals(-1, dictIdSeeker.seek(1));
|
||||
Assert.assertEquals(1, dictIdSeeker.seek(2));
|
||||
|
|
Loading…
Reference in New Issue