Merge pull request #2306 from jon-wei/inherit2

More specific null/empty str handling in IndexMerger
This commit is contained in:
Fangjin Yang 2016-01-21 14:36:09 -08:00
commit 3f998117a6
4 changed files with 68 additions and 35 deletions

View File

@ -627,6 +627,7 @@ public class IndexMerger
ArrayList<FileOutputSupplier> dimOuts = Lists.newArrayListWithCapacity(mergedDimensions.size()); ArrayList<FileOutputSupplier> dimOuts = Lists.newArrayListWithCapacity(mergedDimensions.size());
Map<String, Integer> dimensionCardinalities = Maps.newHashMap(); Map<String, Integer> dimensionCardinalities = Maps.newHashMap();
ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(indexes.size()); ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(indexes.size());
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
for (IndexableAdapter index : indexes) { for (IndexableAdapter index : indexes) {
dimConversions.add(Maps.<String, IntBuffer>newHashMap()); dimConversions.add(Maps.<String, IntBuffer>newHashMap());
@ -641,6 +642,7 @@ public class IndexMerger
List<Indexed<String>> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size() + 1); List<Indexed<String>> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size() + 1);
DimValueConverter[] converters = new DimValueConverter[indexes.size()]; DimValueConverter[] converters = new DimValueConverter[indexes.size()];
boolean dimHasValues = false; boolean dimHasValues = false;
boolean dimAbsentFromSomeIndex = false;
boolean[] dimHasValuesByIndex = new boolean[indexes.size()]; boolean[] dimHasValuesByIndex = new boolean[indexes.size()];
for (int i = 0; i < indexes.size(); i++) { for (int i = 0; i < indexes.size(); i++) {
@ -651,17 +653,22 @@ public class IndexMerger
dimValueLookups.add(dimValues); dimValueLookups.add(dimValues);
converters[i] = new DimValueConverter(dimValues); converters[i] = new DimValueConverter(dimValues);
} else { } else {
dimAbsentFromSomeIndex = true;
dimHasValuesByIndex[i] = false; dimHasValuesByIndex[i] = false;
} }
} }
boolean convertMissingDims = dimHasValues && dimAbsentFromSomeIndex;
convertMissingDimsFlags.add(convertMissingDims);
/* /*
* Ensure the empty str is always in the dictionary if column is not null across indexes * Ensure the empty str is always in the dictionary if the dimension was missing from one index but
* has non-null values in another index.
* This is done so that MMappedIndexRowIterable can convert null columns to empty strings * This is done so that MMappedIndexRowIterable can convert null columns to empty strings
* later on, to allow rows from indexes with no values at all for a dimension to merge correctly with * later on, to allow rows from indexes without a particular dimension to merge correctly with
* rows from indexes with partial null values for that dimension. * rows from indexes with null/empty str values for that dimension.
*/ */
if (dimHasValues) { if (convertMissingDims) {
dimValueLookups.add(EMPTY_STR_DIM_VAL); dimValueLookups.add(EMPTY_STR_DIM_VAL);
for (int i = 0; i < indexes.size(); i++) { for (int i = 0; i < indexes.size(); i++) {
if (!dimHasValuesByIndex[i]) { if (!dimHasValuesByIndex[i]) {
@ -786,7 +793,7 @@ public class IndexMerger
} }
), ),
mergedDimensions, dimConversions.get(i), i, mergedDimensions, dimConversions.get(i), i,
dimensionCardinalities convertMissingDimsFlags
) )
); );
} }
@ -1220,7 +1227,7 @@ public class IndexMerger
private final List<String> convertedDims; private final List<String> convertedDims;
private final Map<String, IntBuffer> converters; private final Map<String, IntBuffer> converters;
private final int indexNumber; private final int indexNumber;
private final Map<String, Integer> dimCardinalities; private final ArrayList<Boolean> convertMissingDimsFlags;
private static final int[] EMPTY_STR_DIM = new int[]{0}; private static final int[] EMPTY_STR_DIM = new int[]{0};
MMappedIndexRowIterable( MMappedIndexRowIterable(
@ -1228,14 +1235,14 @@ public class IndexMerger
List<String> convertedDims, List<String> convertedDims,
Map<String, IntBuffer> converters, Map<String, IntBuffer> converters,
int indexNumber, int indexNumber,
Map<String, Integer> dimCardinalities ArrayList<Boolean> convertMissingDimsFlags
) )
{ {
this.index = index; this.index = index;
this.convertedDims = convertedDims; this.convertedDims = convertedDims;
this.converters = converters; this.converters = converters;
this.indexNumber = indexNumber; this.indexNumber = indexNumber;
this.dimCardinalities = dimCardinalities; this.convertMissingDimsFlags = convertMissingDimsFlags;
} }
public Iterable<Rowboat> getIndex() public Iterable<Rowboat> getIndex()
@ -1280,7 +1287,7 @@ public class IndexMerger
} }
if (dims[i] == null) { if (dims[i] == null) {
if (dimCardinalities.get(dimName) > 0) { if (convertMissingDimsFlags.get(i)) {
newDims[i] = EMPTY_STR_DIM; newDims[i] = EMPTY_STR_DIM;
} }
continue; continue;

View File

@ -49,7 +49,6 @@ import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ColumnDescriptor; import io.druid.segment.column.ColumnDescriptor;
import io.druid.segment.column.ValueType; import io.druid.segment.column.ValueType;
import io.druid.segment.data.ArrayIndexed;
import io.druid.segment.data.BitmapSerdeFactory; import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ByteBufferWriter; import io.druid.segment.data.ByteBufferWriter;
import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.CompressedObjectStrategy;
@ -175,15 +174,24 @@ public class IndexMergerV9 extends IndexMerger
final ArrayList<GenericIndexedWriter<String>> dimValueWriters = setupDimValueWriters(ioPeon, mergedDimensions); final ArrayList<GenericIndexedWriter<String>> dimValueWriters = setupDimValueWriters(ioPeon, mergedDimensions);
final ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(adapters.size()); final ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(adapters.size());
final ArrayList<Boolean> dimensionSkipFlag = Lists.newArrayListWithCapacity(mergedDimensions.size()); final ArrayList<Boolean> dimensionSkipFlag = Lists.newArrayListWithCapacity(mergedDimensions.size());
final ArrayList<Boolean> dimHasNullFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
writeDimValueAndSetupDimConversion( writeDimValueAndSetupDimConversion(
adapters, progress, mergedDimensions, dimCardinalities, dimValueWriters, dimensionSkipFlag, dimConversions adapters, progress, mergedDimensions, dimCardinalities, dimValueWriters, dimensionSkipFlag, dimConversions,
convertMissingDimsFlags, dimHasNullFlags
); );
log.info("Completed dim conversions in %,d millis.", System.currentTimeMillis() - startTime); log.info("Completed dim conversions in %,d millis.", System.currentTimeMillis() - startTime);
/************* Walk through data sets, merge them, and write merged columns *************/ /************* Walk through data sets, merge them, and write merged columns *************/
progress.progress(); progress.progress();
final Iterable<Rowboat> theRows = makeRowIterable( final Iterable<Rowboat> theRows = makeRowIterable(
adapters, mergedDimensions, mergedMetrics, dimConversions, dimCardinalities, rowMergerFn adapters,
mergedDimensions,
mergedMetrics,
dimConversions,
dimCardinalities,
convertMissingDimsFlags,
rowMergerFn
); );
final LongColumnSerializer timeWriter = setupTimeWriter(ioPeon); final LongColumnSerializer timeWriter = setupTimeWriter(ioPeon);
final ArrayList<IndexedIntsWriter> dimWriters = setupDimensionWriters( final ArrayList<IndexedIntsWriter> dimWriters = setupDimensionWriters(
@ -199,7 +207,7 @@ public class IndexMergerV9 extends IndexMerger
} }
mergeIndexesAndWriteColumns( mergeIndexesAndWriteColumns(
adapters, progress, theRows, timeWriter, dimWriters, metWriters, adapters, progress, theRows, timeWriter, dimWriters, metWriters,
dimensionSkipFlag, rowNumConversions, nullRowsList dimensionSkipFlag, rowNumConversions, nullRowsList, dimHasNullFlags
); );
/************ Create Inverted Indexes *************/ /************ Create Inverted Indexes *************/
@ -527,9 +535,6 @@ public class IndexMergerV9 extends IndexMerger
ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap( ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(
nullRowsList.get(dimIndex) nullRowsList.get(dimIndex)
); );
if (Iterables.getFirst(dimVals, "") != null && !nullRowsList.get(dimIndex).isEmpty()) {
bitmapIndexWriters.get(dimIndex).write(nullRowBitmap);
}
for (String dimVal : IndexedIterable.create(dimVals)) { for (String dimVal : IndexedIterable.create(dimVals)) {
progress.progress(); progress.progress();
@ -636,7 +641,8 @@ public class IndexMergerV9 extends IndexMerger
final ArrayList<GenericColumnSerializer> metWriters, final ArrayList<GenericColumnSerializer> metWriters,
final ArrayList<Boolean> dimensionSkipFlag, final ArrayList<Boolean> dimensionSkipFlag,
final List<IntBuffer> rowNumConversions, final List<IntBuffer> rowNumConversions,
final ArrayList<MutableBitmap> nullRowsList final ArrayList<MutableBitmap> nullRowsList,
final ArrayList<Boolean> dimHasNullFlags
) throws IOException ) throws IOException
{ {
final String section = "walk through and merge rows"; final String section = "walk through and merge rows";
@ -665,7 +671,11 @@ public class IndexMergerV9 extends IndexMerger
if (dimensionSkipFlag.get(i)) { if (dimensionSkipFlag.get(i)) {
continue; continue;
} }
if (dims[i] == null || dims[i].length == 0 || (dims[i].length == 1 && dims[i][0] == 0)) { if (dims[i] == null || dims[i].length == 0) {
nullRowsList.get(i).add(rowCount);
} else if (dimHasNullFlags.get(i) && dims[i].length == 1 && dims[i][0] == 0) {
// If this dimension has the null/empty str in its dictionary, a row with a single-valued dimension
// that matches the null/empty str's dictionary ID should also be added to nullRowsList.
nullRowsList.get(i).add(rowCount); nullRowsList.get(i).add(rowCount);
} }
dimWriters.get(i).add(dims[i]); dimWriters.get(i).add(dims[i]);
@ -779,6 +789,7 @@ public class IndexMergerV9 extends IndexMerger
final List<String> mergedMetrics, final List<String> mergedMetrics,
final ArrayList<Map<String, IntBuffer>> dimConversions, final ArrayList<Map<String, IntBuffer>> dimConversions,
final Map<String, Integer> dimCardinalities, final Map<String, Integer> dimCardinalities,
final ArrayList<Boolean> convertMissingDimsFlags,
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
) )
{ {
@ -836,7 +847,7 @@ public class IndexMergerV9 extends IndexMerger
mergedDimensions, mergedDimensions,
dimConversions.get(i), dimConversions.get(i),
i, i,
dimCardinalities convertMissingDimsFlags
) )
); );
} }
@ -868,7 +879,9 @@ public class IndexMergerV9 extends IndexMerger
final Map<String, Integer> dimensionCardinalities, final Map<String, Integer> dimensionCardinalities,
final ArrayList<GenericIndexedWriter<String>> dimValueWriters, final ArrayList<GenericIndexedWriter<String>> dimValueWriters,
final ArrayList<Boolean> dimensionSkipFlag, final ArrayList<Boolean> dimensionSkipFlag,
final List<Map<String, IntBuffer>> dimConversions final List<Map<String, IntBuffer>> dimConversions,
final ArrayList<Boolean> convertMissingDimsFlags,
final ArrayList<Boolean> dimHasNullFlags
) throws IOException ) throws IOException
{ {
final String section = "setup dimension conversions"; final String section = "setup dimension conversions";
@ -889,6 +902,8 @@ public class IndexMergerV9 extends IndexMerger
DimValueConverter[] converters = new DimValueConverter[adapters.size()]; DimValueConverter[] converters = new DimValueConverter[adapters.size()];
boolean dimHasValues = false; boolean dimHasValues = false;
boolean dimAbsentFromSomeIndex = false;
boolean dimHasNull = false;
boolean[] dimHasValuesByIndex = new boolean[adapters.size()]; boolean[] dimHasValuesByIndex = new boolean[adapters.size()];
for (int i = 0; i < adapters.size(); i++) { for (int i = 0; i < adapters.size(); i++) {
Indexed<String> dimValues = adapters.get(i).getDimValueLookup(dimension); Indexed<String> dimValues = adapters.get(i).getDimValueLookup(dimension);
@ -898,17 +913,22 @@ public class IndexMergerV9 extends IndexMerger
dimValueLookups.add(dimValues); dimValueLookups.add(dimValues);
converters[i] = new DimValueConverter(dimValues); converters[i] = new DimValueConverter(dimValues);
} else { } else {
dimAbsentFromSomeIndex = true;
dimHasValuesByIndex[i] = false; dimHasValuesByIndex[i] = false;
} }
} }
boolean convertMissingDims = dimHasValues && dimAbsentFromSomeIndex;
convertMissingDimsFlags.add(convertMissingDims);
/* /*
* Ensure the empty str is always in the dictionary if column is not null across indexes * Ensure the empty str is always in the dictionary if the dimension was missing from one index but
* has non-null values in another index.
* This is done so that MMappedIndexRowIterable can convert null columns to empty strings * This is done so that MMappedIndexRowIterable can convert null columns to empty strings
* later on, to allow rows from indexes with no values at all for a dimension to merge correctly with * later on, to allow rows from indexes without a particular dimension to merge correctly with
* rows from indexes with partial null values for that dimension. * rows from indexes with null/empty str values for that dimension.
*/ */
if (dimHasValues) { if (convertMissingDims) {
dimValueLookups.add(EMPTY_STR_DIM_VAL); dimValueLookups.add(EMPTY_STR_DIM_VAL);
for (int i = 0; i < adapters.size(); i++) { for (int i = 0; i < adapters.size(); i++) {
if (!dimHasValuesByIndex[i]) { if (!dimHasValuesByIndex[i]) {
@ -948,6 +968,10 @@ public class IndexMergerV9 extends IndexMerger
value = value == null ? "" : value; value = value == null ? "" : value;
writer.write(value); writer.write(value);
if (value.length() == 0) {
dimHasNull = true;
}
for (int i = 0; i < adapters.size(); i++) { for (int i = 0; i < adapters.size(); i++) {
DimValueConverter converter = converters[i]; DimValueConverter converter = converters[i];
if (converter != null) { if (converter != null) {
@ -956,6 +980,8 @@ public class IndexMergerV9 extends IndexMerger
} }
++cardinality; ++cardinality;
} }
// Mark if this dim has the null/empty str value in its dictionary, used for determining nullRowsList later.
dimHasNullFlags.add(dimHasNull);
log.info( log.info(
"Completed dim[%s] conversions with cardinality[%,d] in %,d millis.", "Completed dim[%s] conversions with cardinality[%,d] in %,d millis.",

View File

@ -104,7 +104,7 @@ public class SegmentMetadataQueryTest
new ColumnAnalysis( new ColumnAnalysis(
ValueType.STRING.toString(), ValueType.STRING.toString(),
10881, 10881,
2, 1,
null null
) )
), 71982, ), 71982,
@ -135,7 +135,7 @@ public class SegmentMetadataQueryTest
new ColumnAnalysis( new ColumnAnalysis(
ValueType.STRING.toString(), ValueType.STRING.toString(),
21762, 21762,
2, 1,
null null
) )
), ),

View File

@ -222,8 +222,8 @@ public class IndexMergerTest
} }
Assert.assertEquals(2, boatList.size()); Assert.assertEquals(2, boatList.size());
Assert.assertArrayEquals(new int[][]{{1}, {1}}, boatList.get(0).getDims()); Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(1).getDims()); Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(1).getDims());
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dim1", "")); checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dim1", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "1")); checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "1"));
@ -850,11 +850,11 @@ public class IndexMergerTest
Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames())); Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(3, boatList.size()); Assert.assertEquals(3, boatList.size());
Assert.assertArrayEquals(new int[][]{{1}, {1}, {3}}, boatList.get(0).getDims()); Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics()); Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {3}, {1}}, boatList.get(1).getDims()); Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(1).getMetrics()); Assert.assertArrayEquals(new Object[]{3L}, boatList.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{3}, {2}, {2}}, boatList.get(2).getDims()); Assert.assertArrayEquals(new int[][]{{2}, {1}, {1}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics()); Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics());
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("d3", "")); checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("d3", ""));
@ -1194,10 +1194,10 @@ public class IndexMergerTest
ImmutableList.copyOf(adapter.getDimensionNames()) ImmutableList.copyOf(adapter.getDimensionNames())
); );
Assert.assertEquals(4, boatList.size()); Assert.assertEquals(4, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {1}}, boatList.get(0).getDims()); Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {0}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {2}}, boatList.get(1).getDims()); Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {1}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {3}}, boatList.get(2).getDims()); Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {2}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {4}}, boatList.get(3).getDims()); Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {3}}, boatList.get(3).getDims());
checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", "")); checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", ""));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210")); checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210"));