mirror of https://github.com/apache/druid.git
More specific null/empty str handling in IndexMerger
This commit is contained in:
parent
fc09929503
commit
459a236067
|
@ -627,6 +627,7 @@ public class IndexMerger
|
|||
ArrayList<FileOutputSupplier> dimOuts = Lists.newArrayListWithCapacity(mergedDimensions.size());
|
||||
Map<String, Integer> dimensionCardinalities = Maps.newHashMap();
|
||||
ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(indexes.size());
|
||||
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
|
||||
|
||||
for (IndexableAdapter index : indexes) {
|
||||
dimConversions.add(Maps.<String, IntBuffer>newHashMap());
|
||||
|
@ -641,6 +642,7 @@ public class IndexMerger
|
|||
List<Indexed<String>> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size() + 1);
|
||||
DimValueConverter[] converters = new DimValueConverter[indexes.size()];
|
||||
boolean dimHasValues = false;
|
||||
boolean dimAbsentFromSomeIndex = false;
|
||||
boolean[] dimHasValuesByIndex = new boolean[indexes.size()];
|
||||
|
||||
for (int i = 0; i < indexes.size(); i++) {
|
||||
|
@ -651,17 +653,22 @@ public class IndexMerger
|
|||
dimValueLookups.add(dimValues);
|
||||
converters[i] = new DimValueConverter(dimValues);
|
||||
} else {
|
||||
dimAbsentFromSomeIndex = true;
|
||||
dimHasValuesByIndex[i] = false;
|
||||
}
|
||||
}
|
||||
|
||||
boolean convertMissingDims = dimHasValues && dimAbsentFromSomeIndex;
|
||||
convertMissingDimsFlags.add(convertMissingDims);
|
||||
|
||||
/*
|
||||
* Ensure the empty str is always in the dictionary if column is not null across indexes
|
||||
* Ensure the empty str is always in the dictionary if the dimension was missing from one index but
|
||||
* has non-null values in another index.
|
||||
* This is done so that MMappedIndexRowIterable can convert null columns to empty strings
|
||||
* later on, to allow rows from indexes with no values at all for a dimension to merge correctly with
|
||||
* rows from indexes with partial null values for that dimension.
|
||||
* later on, to allow rows from indexes without a particular dimension to merge correctly with
|
||||
* rows from indexes with null/empty str values for that dimension.
|
||||
*/
|
||||
if (dimHasValues) {
|
||||
if (convertMissingDims) {
|
||||
dimValueLookups.add(EMPTY_STR_DIM_VAL);
|
||||
for (int i = 0; i < indexes.size(); i++) {
|
||||
if (!dimHasValuesByIndex[i]) {
|
||||
|
@ -786,7 +793,7 @@ public class IndexMerger
|
|||
}
|
||||
),
|
||||
mergedDimensions, dimConversions.get(i), i,
|
||||
dimensionCardinalities
|
||||
convertMissingDimsFlags
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -1220,7 +1227,7 @@ public class IndexMerger
|
|||
private final List<String> convertedDims;
|
||||
private final Map<String, IntBuffer> converters;
|
||||
private final int indexNumber;
|
||||
private final Map<String, Integer> dimCardinalities;
|
||||
private final ArrayList<Boolean> convertMissingDimsFlags;
|
||||
private static final int[] EMPTY_STR_DIM = new int[]{0};
|
||||
|
||||
MMappedIndexRowIterable(
|
||||
|
@ -1228,14 +1235,14 @@ public class IndexMerger
|
|||
List<String> convertedDims,
|
||||
Map<String, IntBuffer> converters,
|
||||
int indexNumber,
|
||||
Map<String, Integer> dimCardinalities
|
||||
ArrayList<Boolean> convertMissingDimsFlags
|
||||
)
|
||||
{
|
||||
this.index = index;
|
||||
this.convertedDims = convertedDims;
|
||||
this.converters = converters;
|
||||
this.indexNumber = indexNumber;
|
||||
this.dimCardinalities = dimCardinalities;
|
||||
this.convertMissingDimsFlags = convertMissingDimsFlags;
|
||||
}
|
||||
|
||||
public Iterable<Rowboat> getIndex()
|
||||
|
@ -1280,7 +1287,7 @@ public class IndexMerger
|
|||
}
|
||||
|
||||
if (dims[i] == null) {
|
||||
if (dimCardinalities.get(dimName) > 0) {
|
||||
if (convertMissingDimsFlags.get(i)) {
|
||||
newDims[i] = EMPTY_STR_DIM;
|
||||
}
|
||||
continue;
|
||||
|
|
|
@ -49,7 +49,6 @@ import io.druid.segment.column.ColumnCapabilities;
|
|||
import io.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import io.druid.segment.column.ColumnDescriptor;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.ArrayIndexed;
|
||||
import io.druid.segment.data.BitmapSerdeFactory;
|
||||
import io.druid.segment.data.ByteBufferWriter;
|
||||
import io.druid.segment.data.CompressedObjectStrategy;
|
||||
|
@ -175,15 +174,24 @@ public class IndexMergerV9 extends IndexMerger
|
|||
final ArrayList<GenericIndexedWriter<String>> dimValueWriters = setupDimValueWriters(ioPeon, mergedDimensions);
|
||||
final ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(adapters.size());
|
||||
final ArrayList<Boolean> dimensionSkipFlag = Lists.newArrayListWithCapacity(mergedDimensions.size());
|
||||
final ArrayList<Boolean> dimHasNullFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
|
||||
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
|
||||
writeDimValueAndSetupDimConversion(
|
||||
adapters, progress, mergedDimensions, dimCardinalities, dimValueWriters, dimensionSkipFlag, dimConversions
|
||||
adapters, progress, mergedDimensions, dimCardinalities, dimValueWriters, dimensionSkipFlag, dimConversions,
|
||||
convertMissingDimsFlags, dimHasNullFlags
|
||||
);
|
||||
log.info("Completed dim conversions in %,d millis.", System.currentTimeMillis() - startTime);
|
||||
|
||||
/************* Walk through data sets, merge them, and write merged columns *************/
|
||||
progress.progress();
|
||||
final Iterable<Rowboat> theRows = makeRowIterable(
|
||||
adapters, mergedDimensions, mergedMetrics, dimConversions, dimCardinalities, rowMergerFn
|
||||
adapters,
|
||||
mergedDimensions,
|
||||
mergedMetrics,
|
||||
dimConversions,
|
||||
dimCardinalities,
|
||||
convertMissingDimsFlags,
|
||||
rowMergerFn
|
||||
);
|
||||
final LongColumnSerializer timeWriter = setupTimeWriter(ioPeon);
|
||||
final ArrayList<IndexedIntsWriter> dimWriters = setupDimensionWriters(
|
||||
|
@ -199,7 +207,7 @@ public class IndexMergerV9 extends IndexMerger
|
|||
}
|
||||
mergeIndexesAndWriteColumns(
|
||||
adapters, progress, theRows, timeWriter, dimWriters, metWriters,
|
||||
dimensionSkipFlag, rowNumConversions, nullRowsList
|
||||
dimensionSkipFlag, rowNumConversions, nullRowsList, dimHasNullFlags
|
||||
);
|
||||
|
||||
/************ Create Inverted Indexes *************/
|
||||
|
@ -527,9 +535,6 @@ public class IndexMergerV9 extends IndexMerger
|
|||
ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(
|
||||
nullRowsList.get(dimIndex)
|
||||
);
|
||||
if (Iterables.getFirst(dimVals, "") != null && !nullRowsList.get(dimIndex).isEmpty()) {
|
||||
bitmapIndexWriters.get(dimIndex).write(nullRowBitmap);
|
||||
}
|
||||
|
||||
for (String dimVal : IndexedIterable.create(dimVals)) {
|
||||
progress.progress();
|
||||
|
@ -636,7 +641,8 @@ public class IndexMergerV9 extends IndexMerger
|
|||
final ArrayList<GenericColumnSerializer> metWriters,
|
||||
final ArrayList<Boolean> dimensionSkipFlag,
|
||||
final List<IntBuffer> rowNumConversions,
|
||||
final ArrayList<MutableBitmap> nullRowsList
|
||||
final ArrayList<MutableBitmap> nullRowsList,
|
||||
final ArrayList<Boolean> dimHasNullFlags
|
||||
) throws IOException
|
||||
{
|
||||
final String section = "walk through and merge rows";
|
||||
|
@ -665,7 +671,11 @@ public class IndexMergerV9 extends IndexMerger
|
|||
if (dimensionSkipFlag.get(i)) {
|
||||
continue;
|
||||
}
|
||||
if (dims[i] == null || dims[i].length == 0 || (dims[i].length == 1 && dims[i][0] == 0)) {
|
||||
if (dims[i] == null || dims[i].length == 0) {
|
||||
nullRowsList.get(i).add(rowCount);
|
||||
} else if (dimHasNullFlags.get(i) && dims[i].length == 1 && dims[i][0] == 0) {
|
||||
// If this dimension has the null/empty str in its dictionary, a row with a single-valued dimension
|
||||
// that matches the null/empty str's dictionary ID should also be added to nullRowsList.
|
||||
nullRowsList.get(i).add(rowCount);
|
||||
}
|
||||
dimWriters.get(i).add(dims[i]);
|
||||
|
@ -779,6 +789,7 @@ public class IndexMergerV9 extends IndexMerger
|
|||
final List<String> mergedMetrics,
|
||||
final ArrayList<Map<String, IntBuffer>> dimConversions,
|
||||
final Map<String, Integer> dimCardinalities,
|
||||
final ArrayList<Boolean> convertMissingDimsFlags,
|
||||
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
|
||||
)
|
||||
{
|
||||
|
@ -836,7 +847,7 @@ public class IndexMergerV9 extends IndexMerger
|
|||
mergedDimensions,
|
||||
dimConversions.get(i),
|
||||
i,
|
||||
dimCardinalities
|
||||
convertMissingDimsFlags
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -868,7 +879,9 @@ public class IndexMergerV9 extends IndexMerger
|
|||
final Map<String, Integer> dimensionCardinalities,
|
||||
final ArrayList<GenericIndexedWriter<String>> dimValueWriters,
|
||||
final ArrayList<Boolean> dimensionSkipFlag,
|
||||
final List<Map<String, IntBuffer>> dimConversions
|
||||
final List<Map<String, IntBuffer>> dimConversions,
|
||||
final ArrayList<Boolean> convertMissingDimsFlags,
|
||||
final ArrayList<Boolean> dimHasNullFlags
|
||||
) throws IOException
|
||||
{
|
||||
final String section = "setup dimension conversions";
|
||||
|
@ -889,6 +902,8 @@ public class IndexMergerV9 extends IndexMerger
|
|||
DimValueConverter[] converters = new DimValueConverter[adapters.size()];
|
||||
|
||||
boolean dimHasValues = false;
|
||||
boolean dimAbsentFromSomeIndex = false;
|
||||
boolean dimHasNull = false;
|
||||
boolean[] dimHasValuesByIndex = new boolean[adapters.size()];
|
||||
for (int i = 0; i < adapters.size(); i++) {
|
||||
Indexed<String> dimValues = adapters.get(i).getDimValueLookup(dimension);
|
||||
|
@ -898,17 +913,22 @@ public class IndexMergerV9 extends IndexMerger
|
|||
dimValueLookups.add(dimValues);
|
||||
converters[i] = new DimValueConverter(dimValues);
|
||||
} else {
|
||||
dimAbsentFromSomeIndex = true;
|
||||
dimHasValuesByIndex[i] = false;
|
||||
}
|
||||
}
|
||||
|
||||
boolean convertMissingDims = dimHasValues && dimAbsentFromSomeIndex;
|
||||
convertMissingDimsFlags.add(convertMissingDims);
|
||||
|
||||
/*
|
||||
* Ensure the empty str is always in the dictionary if column is not null across indexes
|
||||
* Ensure the empty str is always in the dictionary if the dimension was missing from one index but
|
||||
* has non-null values in another index.
|
||||
* This is done so that MMappedIndexRowIterable can convert null columns to empty strings
|
||||
* later on, to allow rows from indexes with no values at all for a dimension to merge correctly with
|
||||
* rows from indexes with partial null values for that dimension.
|
||||
* later on, to allow rows from indexes without a particular dimension to merge correctly with
|
||||
* rows from indexes with null/empty str values for that dimension.
|
||||
*/
|
||||
if (dimHasValues) {
|
||||
if (convertMissingDims) {
|
||||
dimValueLookups.add(EMPTY_STR_DIM_VAL);
|
||||
for (int i = 0; i < adapters.size(); i++) {
|
||||
if (!dimHasValuesByIndex[i]) {
|
||||
|
@ -948,6 +968,10 @@ public class IndexMergerV9 extends IndexMerger
|
|||
value = value == null ? "" : value;
|
||||
writer.write(value);
|
||||
|
||||
if (value.length() == 0) {
|
||||
dimHasNull = true;
|
||||
}
|
||||
|
||||
for (int i = 0; i < adapters.size(); i++) {
|
||||
DimValueConverter converter = converters[i];
|
||||
if (converter != null) {
|
||||
|
@ -956,6 +980,8 @@ public class IndexMergerV9 extends IndexMerger
|
|||
}
|
||||
++cardinality;
|
||||
}
|
||||
// Mark if this dim has the null/empty str value in its dictionary, used for determining nullRowsList later.
|
||||
dimHasNullFlags.add(dimHasNull);
|
||||
|
||||
log.info(
|
||||
"Completed dim[%s] conversions with cardinality[%,d] in %,d millis.",
|
||||
|
|
|
@ -104,7 +104,7 @@ public class SegmentMetadataQueryTest
|
|||
new ColumnAnalysis(
|
||||
ValueType.STRING.toString(),
|
||||
10881,
|
||||
2,
|
||||
1,
|
||||
null
|
||||
)
|
||||
), 71982,
|
||||
|
@ -135,7 +135,7 @@ public class SegmentMetadataQueryTest
|
|||
new ColumnAnalysis(
|
||||
ValueType.STRING.toString(),
|
||||
21762,
|
||||
2,
|
||||
1,
|
||||
null
|
||||
)
|
||||
),
|
||||
|
|
|
@ -217,8 +217,8 @@ public class IndexMergerTest
|
|||
}
|
||||
|
||||
Assert.assertEquals(2, boatList.size());
|
||||
Assert.assertArrayEquals(new int[][]{{1}, {1}}, boatList.get(0).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(1).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(1).getDims());
|
||||
|
||||
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dim1", ""));
|
||||
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "1"));
|
||||
|
@ -845,11 +845,11 @@ public class IndexMergerTest
|
|||
|
||||
Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames()));
|
||||
Assert.assertEquals(3, boatList.size());
|
||||
Assert.assertArrayEquals(new int[][]{{1}, {1}, {3}}, boatList.get(0).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList.get(0).getDims());
|
||||
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
|
||||
Assert.assertArrayEquals(new int[][]{{2}, {3}, {1}}, boatList.get(1).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}}, boatList.get(1).getDims());
|
||||
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(1).getMetrics());
|
||||
Assert.assertArrayEquals(new int[][]{{3}, {2}, {2}}, boatList.get(2).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{2}, {1}, {1}}, boatList.get(2).getDims());
|
||||
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics());
|
||||
|
||||
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("d3", ""));
|
||||
|
@ -1189,10 +1189,10 @@ public class IndexMergerTest
|
|||
ImmutableList.copyOf(adapter.getDimensionNames())
|
||||
);
|
||||
Assert.assertEquals(4, boatList.size());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {1}}, boatList.get(0).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {2}}, boatList.get(1).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {3}}, boatList.get(2).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {4}}, boatList.get(3).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {0}}, boatList.get(0).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {1}}, boatList.get(1).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {2}}, boatList.get(2).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {3}}, boatList.get(3).getDims());
|
||||
|
||||
checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", ""));
|
||||
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210"));
|
||||
|
|
Loading…
Reference in New Issue