fix thread safety issue with nested column global dictionaries (#13265)

* fix thread safety issue with nested column global dictionaries

* missing float

* clarify javadocs thread safety
This commit is contained in:
Clint Wylie 2022-10-27 17:58:24 -07:00 committed by GitHub
parent affc522b9f
commit acb9cb0227
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 627 additions and 143 deletions

View File

@ -250,6 +250,12 @@ public class TypeStrategies
return buffer.getLong();
}
@Override
public Long read(ByteBuffer buffer, int offset)
{
return buffer.getLong(offset);
}
@Override
public boolean readRetainsBufferReference()
{
@ -297,6 +303,12 @@ public class TypeStrategies
return buffer.getFloat();
}
@Override
public Float read(ByteBuffer buffer, int offset)
{
return buffer.getFloat(offset);
}
@Override
public boolean readRetainsBufferReference()
{
@ -344,6 +356,12 @@ public class TypeStrategies
return buffer.getDouble();
}
@Override
public Double read(ByteBuffer buffer, int offset)
{
return buffer.getDouble(offset);
}
@Override
public boolean readRetainsBufferReference()
{

View File

@ -31,6 +31,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexRowHolder;
import org.apache.druid.segment.nested.GlobalDictionarySortedCollector;
import org.apache.druid.segment.nested.GlobalDimensionDictionary;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.nested.NestedLiteralTypeInfo;
@ -38,6 +39,7 @@ import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.segment.nested.StructuredDataProcessor;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
@ -224,6 +226,23 @@ public class NestedDataColumnIndexer implements DimensionIndexer<StructuredData,
throw new UnsupportedOperationException("Not supported");
}
public void mergeFields(SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields)
{
for (Map.Entry<String, NestedDataColumnIndexer.LiteralFieldIndexer> entry : fieldIndexers.entrySet()) {
// skip adding the field if no types are in the set, meaning only null values have been processed
if (!entry.getValue().getTypes().isEmpty()) {
mergedFields.put(entry.getKey(), entry.getValue().getTypes());
}
}
}
public GlobalDictionarySortedCollector getSortedCollector()
{
return globalDictionary.getSortedCollector();
}
static class LiteralFieldIndexer
{
private final GlobalDimensionDictionary globalDimensionDictionary;

View File

@ -45,7 +45,6 @@ import java.io.IOException;
import java.nio.IntBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@ -164,13 +163,8 @@ public class NestedDataColumnMerger implements DimensionMergerV9
return null;
}
final NestedDataColumnIndexer indexer = (NestedDataColumnIndexer) dim.getIndexer();
for (Map.Entry<String, NestedDataColumnIndexer.LiteralFieldIndexer> entry : indexer.fieldIndexers.entrySet()) {
// skip adding the field if no types are in the set, meaning only null values have been processed
if (!entry.getValue().getTypes().isEmpty()) {
mergedFields.put(entry.getKey(), entry.getValue().getTypes());
}
}
return indexer.globalDictionary.getSortedCollector();
indexer.mergeFields(mergedFields);
return indexer.getSortedCollector();
}
@Nullable

View File

@ -38,7 +38,6 @@ public class CompressedVariableSizedBlobColumnSupplier implements Supplier<Compr
SmooshedFileMapper mapper
) throws IOException
{
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == VERSION) {
final int numElements = buffer.getInt();

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.data;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.column.TypeStrategy;
@ -43,12 +44,14 @@ import java.util.Iterator;
* If {@link #hasNull} is set, id 0 is ALWAYS null, so the comparator should be 'nulls first' or else behavior will
* be unexpected. {@link #hasNull} can only be set if also {@link #isSorted} is set, since the null value is not
* actually stored in the values section.
*
* This class is thread-safe if and only if {@link TypeStrategy#read(ByteBuffer, int)} is thread-safe.
*/
public class FixedIndexed<T> implements Indexed<T>
{
public static final byte IS_SORTED_MASK = 0x02;
public static <T> FixedIndexed<T> read(ByteBuffer bb, TypeStrategy<T> strategy, ByteOrder byteOrder, int width)
public static <T> Supplier<FixedIndexed<T>> read(ByteBuffer bb, TypeStrategy<T> strategy, ByteOrder byteOrder, int width)
{
final ByteBuffer buffer = bb.asReadOnlyBuffer().order(byteOrder);
final byte version = buffer.get();
@ -59,8 +62,9 @@ public class FixedIndexed<T> implements Indexed<T>
Preconditions.checkState(!(hasNull && !isSorted), "cannot have null values if not sorted");
final int size = buffer.getInt() + (hasNull ? 1 : 0);
final int valuesOffset = buffer.position();
final FixedIndexed<T> fixedIndexed = new FixedIndexed<>(
buffer,
final Supplier<FixedIndexed<T>> fixedIndexed = () -> new FixedIndexed<>(
bb,
byteOrder,
strategy,
hasNull,
isSorted,
@ -68,6 +72,7 @@ public class FixedIndexed<T> implements Indexed<T>
size,
valuesOffset
);
bb.position(buffer.position() + (width * size));
return fixedIndexed;
}
@ -83,6 +88,7 @@ public class FixedIndexed<T> implements Indexed<T>
private FixedIndexed(
ByteBuffer buffer,
ByteOrder byteOrder,
TypeStrategy<T> typeStrategy,
boolean hasNull,
boolean isSorted,
@ -91,7 +97,7 @@ public class FixedIndexed<T> implements Indexed<T>
int valuesOffset
)
{
this.buffer = buffer;
this.buffer = buffer.asReadOnlyBuffer().order(byteOrder);
this.typeStrategy = typeStrategy;
Preconditions.checkArgument(width > 0, "FixedIndexed requires a fixed width value type");
this.width = width;

View File

@ -67,6 +67,7 @@ import java.util.NoSuchElementException;
* The value iterator reads an entire bucket at a time, reconstructing the values into an array to iterate within the
* bucket before moving onto the next bucket as the iterator is consumed.
*
* This class is not thread-safe since during operation modifies positions of a shared buffer.
*/
public final class FrontCodedIndexed implements Indexed<ByteBuffer>
{

View File

@ -469,7 +469,7 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
/**
* Single-threaded view.
*/
abstract class BufferIndexed implements Indexed<T>
public abstract class BufferIndexed implements Indexed<T>
{
int lastReadSize;

View File

@ -87,9 +87,9 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
private final GenericIndexed<String> fields;
private final NestedLiteralTypeInfo fieldInfo;
private final TStringDictionary stringDictionary;
private final FixedIndexed<Long> longDictionary;
private final FixedIndexed<Double> doubleDictionary;
private final Supplier<TStringDictionary> stringDictionarySupplier;
private final Supplier<FixedIndexed<Long>> longDictionarySupplier;
private final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
private final SmooshedFileMapper fileMapper;
private final ConcurrentHashMap<String, ColumnHolder> columns = new ConcurrentHashMap<>();
@ -103,9 +103,9 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
ImmutableBitmap nullValues,
GenericIndexed<String> fields,
NestedLiteralTypeInfo fieldInfo,
TStringDictionary stringDictionary,
FixedIndexed<Long> longDictionary,
FixedIndexed<Double> doubleDictionary,
Supplier<TStringDictionary> stringDictionary,
Supplier<FixedIndexed<Long>> longDictionarySupplier,
Supplier<FixedIndexed<Double>> doubleDictionarySupplier,
SmooshedFileMapper fileMapper
)
{
@ -113,9 +113,9 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
this.nullValues = nullValues;
this.fields = fields;
this.fieldInfo = fieldInfo;
this.stringDictionary = stringDictionary;
this.longDictionary = longDictionary;
this.doubleDictionary = doubleDictionary;
this.stringDictionarySupplier = stringDictionary;
this.longDictionarySupplier = longDictionarySupplier;
this.doubleDictionarySupplier = doubleDictionarySupplier;
this.fileMapper = fileMapper;
this.closer = Closer.create();
this.compressedRawColumnSupplier = compressedRawColumnSupplier;
@ -133,17 +133,17 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
public TStringDictionary getStringDictionary()
{
return stringDictionary;
return stringDictionarySupplier.get();
}
public FixedIndexed<Long> getLongDictionary()
{
return longDictionary;
return longDictionarySupplier.get();
}
public FixedIndexed<Double> getDoubleDictionary()
{
return doubleDictionary;
return doubleDictionarySupplier.get();
}
@Nullable
@ -406,7 +406,7 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
)
);
final FixedIndexed<Integer> localDictionary = FixedIndexed.read(
final Supplier<FixedIndexed<Integer>> localDictionarySupplier = FixedIndexed.read(
dataBuffer,
NestedDataColumnSerializer.INT_TYPE_STRATEGY,
metadata.getByteOrder(),
@ -436,20 +436,22 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
metadata.getBitmapSerdeFactory().getObjectStrategy(),
columnBuilder.getFileMapper()
);
Supplier<DictionaryEncodedColumn<?>> columnSupplier = () ->
closer.register(new NestedFieldLiteralDictionaryEncodedColumn(
Supplier<DictionaryEncodedColumn<?>> columnSupplier = () -> {
FixedIndexed<Integer> localDict = localDictionarySupplier.get();
return closer.register(new NestedFieldLiteralDictionaryEncodedColumn(
types,
longs.get(),
doubles.get(),
ints.get(),
stringDictionary,
longDictionary,
doubleDictionary,
localDictionary,
localDictionary.get(0) == 0
stringDictionarySupplier.get(),
longDictionarySupplier.get(),
doubleDictionarySupplier.get(),
localDict,
localDict.get(0) == 0
? rBitmaps.get(0)
: metadata.getBitmapSerdeFactory().getBitmapFactory().makeEmptyImmutableBitmap()
));
};
columnBuilder.setHasMultipleValues(false)
.setHasNulls(true)
.setDictionaryEncodedColumnSupplier(columnSupplier);
@ -458,10 +460,10 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
types,
metadata.getBitmapSerdeFactory().getBitmapFactory(),
rBitmaps,
localDictionary,
stringDictionary,
longDictionary,
doubleDictionary
localDictionarySupplier,
stringDictionarySupplier,
longDictionarySupplier,
doubleDictionarySupplier
),
true,
false

View File

@ -381,6 +381,12 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
return buffer.getInt();
}
@Override
public Integer read(ByteBuffer buffer, int offset)
{
return buffer.getInt(offset);
}
@Override
public boolean readRetainsBufferReference()
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.nested;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
@ -32,6 +33,7 @@ import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.column.StringEncodingStrategy;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSupplier;
import org.apache.druid.segment.data.EncodedStringDictionaryWriter;
import org.apache.druid.segment.data.FixedIndexed;
@ -49,19 +51,32 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
private final GenericIndexed<String> fields;
private final NestedLiteralTypeInfo fieldInfo;
private final GenericIndexed<ByteBuffer> dictionary;
private final Supplier<FrontCodedIndexed> frontCodedDictionary;
private final FixedIndexed<Long> longDictionary;
private final FixedIndexed<Double> doubleDictionary;
private final Supplier<FrontCodedIndexed> frontCodedDictionarySupplier;
private final Supplier<FixedIndexed<Long>> longDictionarySupplier;
private final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
private final ColumnConfig columnConfig;
private final SmooshedFileMapper fileMapper;
public NestedDataColumnSupplier(
ByteBuffer bb,
ColumnBuilder columnBuilder,
ColumnConfig columnConfig,
ObjectMapper jsonMapper
)
{
this(bb, columnBuilder, columnConfig, jsonMapper, ColumnType.LONG.getStrategy(), ColumnType.DOUBLE.getStrategy());
}
// strictly for testing?
@VisibleForTesting
public NestedDataColumnSupplier(
ByteBuffer bb,
ColumnBuilder columnBuilder,
ColumnConfig columnConfig,
ObjectMapper jsonMapper,
TypeStrategy<Long> longTypeStrategy,
TypeStrategy<Double> doubleTypeStrategy
)
{
byte version = bb.get();
@ -86,14 +101,14 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
if (dictionaryVersion == EncodedStringDictionaryWriter.VERSION) {
final byte encodingId = stringDictionaryBuffer.get();
if (encodingId == StringEncodingStrategy.FRONT_CODED_ID) {
frontCodedDictionary = FrontCodedIndexed.read(stringDictionaryBuffer, metadata.getByteOrder());
frontCodedDictionarySupplier = FrontCodedIndexed.read(stringDictionaryBuffer, metadata.getByteOrder());
dictionary = null;
} else if (encodingId == StringEncodingStrategy.UTF8_ID) {
// this cannot happen naturally right now since generic indexed is written in the 'legacy' format, but
// this provides backwards compatibility should we switch at some point in the future to always
// writing dictionaryVersion
dictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.BYTE_BUFFER_STRATEGY, mapper);
frontCodedDictionary = null;
frontCodedDictionarySupplier = null;
} else {
throw new ISE("impossible, unknown encoding strategy id: %s", encodingId);
}
@ -103,15 +118,15 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
// GenericIndexed version can be correctly read
stringDictionaryBuffer.position(dictionaryStartPosition);
dictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.BYTE_BUFFER_STRATEGY, mapper);
frontCodedDictionary = null;
frontCodedDictionarySupplier = null;
}
final ByteBuffer longDictionaryBuffer = loadInternalFile(
mapper,
NestedDataColumnSerializer.LONG_DICTIONARY_FILE_NAME
);
longDictionary = FixedIndexed.read(
longDictionarySupplier = FixedIndexed.read(
longDictionaryBuffer,
ColumnType.LONG.getStrategy(),
longTypeStrategy,
metadata.getByteOrder(),
Long.BYTES
);
@ -119,13 +134,13 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
mapper,
NestedDataColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME
);
doubleDictionary = FixedIndexed.read(
doubleDictionarySupplier = FixedIndexed.read(
doubleDictionaryBuffer,
ColumnType.DOUBLE.getStrategy(),
doubleTypeStrategy,
metadata.getByteOrder(),
Double.BYTES
);
final ByteBuffer rawBuffer = loadInternalFile(mapper, NestedDataColumnSerializer.RAW_FILE_NAME).asReadOnlyBuffer();
final ByteBuffer rawBuffer = loadInternalFile(mapper, NestedDataColumnSerializer.RAW_FILE_NAME);
compressedRawColumnSupplier = CompressedVariableSizedBlobColumnSupplier.fromByteBuffer(
NestedDataColumnSerializer.getInternalFileName(
metadata.getFileNameBase(), NestedDataColumnSerializer.RAW_FILE_NAME
@ -156,6 +171,7 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
@Override
public ComplexColumn get()
{
if (frontCodedDictionarySupplier != null) {
return new CompressedNestedDataComplexColumn<>(
metadata,
columnConfig,
@ -163,9 +179,22 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
nullValues,
fields,
fieldInfo,
frontCodedDictionary == null ? dictionary : frontCodedDictionary.get(),
longDictionary,
doubleDictionary,
frontCodedDictionarySupplier,
longDictionarySupplier,
doubleDictionarySupplier,
fileMapper
);
}
return new CompressedNestedDataComplexColumn<>(
metadata,
columnConfig,
compressedRawColumnSupplier,
nullValues,
fields,
fieldInfo,
dictionary::singleThreaded,
longDictionarySupplier,
doubleDictionarySupplier,
fileMapper
);
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.nested;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.primitives.Doubles;
import it.unimi.dsi.fastutil.doubles.DoubleArraySet;
import it.unimi.dsi.fastutil.doubles.DoubleIterator;
@ -77,10 +78,10 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
private final ColumnType singleType;
private final BitmapFactory bitmapFactory;
private final GenericIndexed<ImmutableBitmap> bitmaps;
private final FixedIndexed<Integer> dictionary;
private final TStringDictionary globalDictionary;
private final FixedIndexed<Long> globalLongDictionary;
private final FixedIndexed<Double> globalDoubleDictionary;
private final Supplier<FixedIndexed<Integer>> localDictionarySupplier;
private final Supplier<TStringDictionary> globalStringDictionarySupplier;
private final Supplier<FixedIndexed<Long>> globalLongDictionarySupplier;
private final Supplier<FixedIndexed<Double>> globalDoubleDictionarySupplier;
private final int adjustLongId;
private final int adjustDoubleId;
@ -89,21 +90,21 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
NestedLiteralTypeInfo.TypeSet types,
BitmapFactory bitmapFactory,
GenericIndexed<ImmutableBitmap> bitmaps,
FixedIndexed<Integer> dictionary,
TStringDictionary globalDictionary,
FixedIndexed<Long> globalLongDictionary,
FixedIndexed<Double> globalDoubleDictionary
Supplier<FixedIndexed<Integer>> localDictionarySupplier,
Supplier<TStringDictionary> globalStringDictionarySupplier,
Supplier<FixedIndexed<Long>> globalLongDictionarySupplier,
Supplier<FixedIndexed<Double>> globalDoubleDictionarySupplier
)
{
this.singleType = types.getSingleType();
this.bitmapFactory = bitmapFactory;
this.bitmaps = bitmaps;
this.dictionary = dictionary;
this.globalDictionary = globalDictionary;
this.globalLongDictionary = globalLongDictionary;
this.globalDoubleDictionary = globalDoubleDictionary;
this.adjustLongId = globalDictionary.size();
this.adjustDoubleId = adjustLongId + globalLongDictionary.size();
this.localDictionarySupplier = localDictionarySupplier;
this.globalStringDictionarySupplier = globalStringDictionarySupplier;
this.globalLongDictionarySupplier = globalLongDictionarySupplier;
this.globalDoubleDictionarySupplier = globalDoubleDictionarySupplier;
this.adjustLongId = globalStringDictionarySupplier.get().size();
this.adjustDoubleId = adjustLongId + globalLongDictionarySupplier.get().size();
}
@Nullable
@ -112,7 +113,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
{
if (clazz.equals(NullValueIndex.class)) {
final BitmapColumnIndex nullIndex;
if (dictionary.get(0) == 0) {
if (localDictionarySupplier.get().get(0) == 0) {
// null index is always 0 in the global dictionary, even if there are no null rows in any of the literal columns
nullIndex = new SimpleImmutableBitmapIndex(bitmaps.get(0));
} else {
@ -175,7 +176,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
}
/**
* Gets a value range from a global dictionary and maps it to a range on the local {@link #dictionary}.
* Gets a value range from a global dictionary and maps it to a range on the local {@link #localDictionarySupplier}.
* The starting index of the resulting range is inclusive, while the endpoint is exclusive [start, end)
*/
private <T> IntIntPair getLocalRangeFromDictionary(
@ -183,6 +184,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
boolean startStrict,
@Nullable T endValue,
boolean endStrict,
Indexed<Integer> localDictionary,
Indexed<T> globalDictionary,
int adjust
)
@ -199,8 +201,9 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
globalStartIndex = adjust + (-(found + 1));
}
}
// with starting global index settled, now lets find starting local index
int localFound = dictionary.indexOf(globalStartIndex);
int localFound = localDictionary.indexOf(globalStartIndex);
if (localFound < 0) {
// the first valid global index is not within the local dictionary, so the insertion point is where we begin
localStartIndex = -(localFound + 1);
@ -221,7 +224,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
}
globalEndIndex = Math.max(globalStartIndex, globalEndIndex);
// end index is not inclusive, so we find the last value in the local dictionary that falls within the range
int localEndFound = dictionary.indexOf(globalEndIndex - 1);
int localEndFound = localDictionary.indexOf(globalEndIndex - 1);
if (localEndFound < 0) {
localEndIndex = -localEndFound;
} else {
@ -229,7 +232,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
localEndIndex = localEndFound + 1;
}
return new IntIntImmutablePair(localStartIndex, Math.min(dictionary.size(), localEndIndex));
return new IntIntImmutablePair(localStartIndex, Math.min(localDictionary.size(), localEndIndex));
}
@ -238,6 +241,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
boolean startStrict,
@Nullable T endValue,
boolean endStrict,
Indexed<Integer> localDictionary,
Indexed<T> globalDictionary,
int adjust
)
@ -247,6 +251,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
startStrict,
endValue,
endStrict,
localDictionary,
globalDictionary,
adjust
);
@ -279,23 +284,28 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
private class NestedLiteralDictionaryEncodedStringValueIndex implements DictionaryEncodedStringValueIndex
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final Indexed<ByteBuffer> stringDictionary = globalStringDictionarySupplier.get();
final FixedIndexed<Long> longDictionary = globalLongDictionarySupplier.get();
final FixedIndexed<Double> doubleDictionary = globalDoubleDictionarySupplier.get();
@Override
public int getCardinality()
{
return dictionary.size();
return localDictionary.size();
}
@Nullable
@Override
public String getValue(int index)
{
int globalIndex = dictionary.get(index);
int globalIndex = localDictionary.get(index);
if (globalIndex < adjustLongId) {
return StringUtils.fromUtf8Nullable(globalDictionary.get(globalIndex));
return StringUtils.fromUtf8Nullable(stringDictionary.get(globalIndex));
} else if (globalIndex < adjustDoubleId) {
return String.valueOf(globalLongDictionary.get(globalIndex - adjustLongId));
return String.valueOf(longDictionary.get(globalIndex - adjustLongId));
} else {
return String.valueOf(globalDoubleDictionary.get(globalIndex - adjustDoubleId));
return String.valueOf(doubleDictionary.get(globalIndex - adjustDoubleId));
}
}
@ -313,11 +323,13 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
{
return new SimpleBitmapColumnIndex()
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final Indexed<ByteBuffer> stringDictionary = globalStringDictionarySupplier.get();
@Override
public double estimateSelectivity(int totalRows)
{
return (double) getBitmap(
dictionary.indexOf(globalDictionary.indexOf(StringUtils.toUtf8ByteBuffer(value)))
localDictionary.indexOf(stringDictionary.indexOf(StringUtils.toUtf8ByteBuffer(value)))
).size() / totalRows;
}
@ -325,9 +337,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory)
{
return bitmapResultFactory.wrapDimensionValue(
getBitmap(
dictionary.indexOf(globalDictionary.indexOf(StringUtils.toUtf8ByteBuffer(value)))
)
getBitmap(localDictionary.indexOf(stringDictionary.indexOf(StringUtils.toUtf8ByteBuffer(value))))
);
}
};
@ -343,6 +353,8 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
{
return () -> new Iterator<ImmutableBitmap>()
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final Indexed<ByteBuffer> stringDictionary = globalStringDictionarySupplier.get();
final Iterator<String> iterator = values.iterator();
int next = -1;
@ -373,7 +385,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
{
while (next < 0 && iterator.hasNext()) {
String nextValue = iterator.next();
next = dictionary.indexOf(globalDictionary.indexOf(StringUtils.toUtf8ByteBuffer(nextValue)));
next = localDictionary.indexOf(stringDictionary.indexOf(StringUtils.toUtf8ByteBuffer(nextValue)));
}
}
};
@ -397,7 +409,8 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
startStrict,
StringUtils.toUtf8ByteBuffer(NullHandling.emptyToNullIfNeeded(endValue)),
endStrict,
globalDictionary,
localDictionarySupplier.get(),
globalStringDictionarySupplier.get(),
0
);
}
@ -416,12 +429,16 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
@Override
public Iterable<ImmutableBitmap> getBitmapIterable()
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final Indexed<ByteBuffer> stringDictionary = globalStringDictionarySupplier.get();
final IntIntPair range = getLocalRangeFromDictionary(
StringUtils.toUtf8ByteBuffer(startValue),
startStrict,
StringUtils.toUtf8ByteBuffer(endValue),
endStrict,
globalDictionary,
localDictionary,
stringDictionary,
0
);
final int start = range.leftInt(), end = range.rightInt();
@ -436,7 +453,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
private int findNext()
{
while (currIndex < end && !matcher.apply(StringUtils.fromUtf8Nullable(globalDictionary.get(dictionary.get(currIndex))))) {
while (currIndex < end && !matcher.apply(StringUtils.fromUtf8Nullable(stringDictionary.get(localDictionary.get(currIndex))))) {
currIndex++;
}
@ -484,10 +501,13 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
return () -> new Iterator<ImmutableBitmap>()
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final Indexed<ByteBuffer> stringDictionary = globalStringDictionarySupplier.get();
final Predicate<String> stringPredicate = matcherFactory.makeStringPredicate();
// in the future, this could use an int iterator
final Iterator<Integer> iterator = dictionary.iterator();
final Iterator<Integer> iterator = localDictionary.iterator();
int next;
int index = 0;
boolean nextSet = false;
@ -518,7 +538,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
{
while (!nextSet && iterator.hasNext()) {
Integer nextValue = iterator.next();
nextSet = stringPredicate.apply(StringUtils.fromUtf8Nullable(globalDictionary.get(nextValue)));
nextSet = stringPredicate.apply(StringUtils.fromUtf8Nullable(stringDictionary.get(nextValue)));
if (nextSet) {
next = index;
}
@ -539,22 +559,29 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
final Long longValue = GuavaUtils.tryParseLong(value);
return new SimpleBitmapColumnIndex()
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final FixedIndexed<Long> longDictionary = globalLongDictionarySupplier.get();
@Override
public double estimateSelectivity(int totalRows)
{
if (longValue == null) {
return (double) getBitmap(dictionary.indexOf(0)).size() / totalRows;
return (double) getBitmap(localDictionary.indexOf(0)).size() / totalRows;
}
return (double) getBitmap(dictionary.indexOf(globalLongDictionary.indexOf(longValue) + adjustLongId)).size() / totalRows;
return (double) getBitmap(
localDictionary.indexOf(longDictionary.indexOf(longValue) + adjustLongId)
).size() / totalRows;
}
@Override
public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory)
{
if (longValue == null) {
return bitmapResultFactory.wrapDimensionValue(getBitmap(dictionary.indexOf(0)));
return bitmapResultFactory.wrapDimensionValue(getBitmap(localDictionary.indexOf(0)));
}
return bitmapResultFactory.wrapDimensionValue(getBitmap(dictionary.indexOf(globalLongDictionary.indexOf(longValue) + adjustLongId)));
return bitmapResultFactory.wrapDimensionValue(
getBitmap(localDictionary.indexOf(longDictionary.indexOf(longValue) + adjustLongId))
);
}
};
}
@ -582,6 +609,8 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
final boolean doNullCheck = needNullCheck;
return () -> new Iterator<ImmutableBitmap>()
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final FixedIndexed<Long> longDictionary = globalLongDictionarySupplier.get();
final LongIterator iterator = longs.iterator();
int next = -1;
boolean nullChecked = false;
@ -620,7 +649,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
{
while (next < 0 && iterator.hasNext()) {
long nextValue = iterator.nextLong();
next = dictionary.indexOf(globalLongDictionary.indexOf(nextValue) + adjustLongId);
next = localDictionary.indexOf(longDictionary.indexOf(nextValue) + adjustLongId);
}
}
};
@ -644,7 +673,8 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
startStrict,
endValue != null ? endValue.longValue() : null,
endStrict,
globalLongDictionary,
localDictionarySupplier.get(),
globalLongDictionarySupplier.get(),
adjustLongId
);
}
@ -662,10 +692,12 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
{
return () -> new Iterator<ImmutableBitmap>()
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final FixedIndexed<Long> longDictionary = globalLongDictionarySupplier.get();
final DruidLongPredicate longPredicate = matcherFactory.makeLongPredicate();
// in the future, this could use an int iterator
final Iterator<Integer> iterator = dictionary.iterator();
final Iterator<Integer> iterator = localDictionary.iterator();
int next;
int index = 0;
boolean nextSet = false;
@ -700,7 +732,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
if (nextValue == 0) {
nextSet = longPredicate.applyNull();
} else {
nextSet = longPredicate.applyLong(globalLongDictionary.get(nextValue - adjustLongId));
nextSet = longPredicate.applyLong(longDictionary.get(nextValue - adjustLongId));
}
if (nextSet) {
next = index;
@ -722,22 +754,28 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
final Double doubleValue = Strings.isNullOrEmpty(value) ? null : Doubles.tryParse(value);
return new SimpleBitmapColumnIndex()
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final FixedIndexed<Double> doubleDictionary = globalDoubleDictionarySupplier.get();
@Override
public double estimateSelectivity(int totalRows)
{
if (doubleValue == null) {
return (double) getBitmap(dictionary.indexOf(0)).size() / totalRows;
return (double) getBitmap(localDictionary.indexOf(0)).size() / totalRows;
}
return (double) getBitmap(dictionary.indexOf(globalDoubleDictionary.indexOf(doubleValue) + adjustDoubleId)).size() / totalRows;
return (double) getBitmap(
localDictionary.indexOf(doubleDictionary.indexOf(doubleValue) + adjustDoubleId)
).size() / totalRows;
}
@Override
public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory)
{
if (doubleValue == null) {
return bitmapResultFactory.wrapDimensionValue(getBitmap(dictionary.indexOf(0)));
return bitmapResultFactory.wrapDimensionValue(getBitmap(localDictionary.indexOf(0)));
}
return bitmapResultFactory.wrapDimensionValue(getBitmap(dictionary.indexOf(globalDoubleDictionary.indexOf(doubleValue) + adjustDoubleId)));
return bitmapResultFactory.wrapDimensionValue(
getBitmap(localDictionary.indexOf(doubleDictionary.indexOf(doubleValue) + adjustDoubleId))
);
}
};
}
@ -765,6 +803,8 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
final boolean doNullCheck = needNullCheck;
return () -> new Iterator<ImmutableBitmap>()
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final FixedIndexed<Double> doubleDictionary = globalDoubleDictionarySupplier.get();
final DoubleIterator iterator = doubles.iterator();
int next = -1;
boolean nullChecked = false;
@ -803,7 +843,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
{
while (next < 0 && iterator.hasNext()) {
double nextValue = iterator.nextDouble();
next = dictionary.indexOf(globalDoubleDictionary.indexOf(nextValue) + adjustDoubleId);
next = localDictionary.indexOf(doubleDictionary.indexOf(nextValue) + adjustDoubleId);
}
}
};
@ -827,7 +867,8 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
startStrict,
endValue != null ? endValue.doubleValue() : null,
endStrict,
globalDoubleDictionary,
localDictionarySupplier.get(),
globalDoubleDictionarySupplier.get(),
adjustDoubleId
);
}
@ -845,10 +886,12 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
{
return () -> new Iterator<ImmutableBitmap>()
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final FixedIndexed<Double> doubleDictionary = globalDoubleDictionarySupplier.get();
final DruidDoublePredicate doublePredicate = matcherFactory.makeDoublePredicate();
// in the future, this could use an int iterator
final Iterator<Integer> iterator = dictionary.iterator();
final Iterator<Integer> iterator = localDictionary.iterator();
int next;
int index = 0;
boolean nextSet = false;
@ -882,7 +925,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
if (nextValue == 0) {
nextSet = doublePredicate.applyNull();
} else {
nextSet = doublePredicate.applyDouble(globalDoubleDictionary.get(nextValue - adjustDoubleId));
nextSet = doublePredicate.applyDouble(doubleDictionary.get(nextValue - adjustDoubleId));
}
if (nextSet) {
next = index;
@ -898,24 +941,29 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
private abstract class NestedVariantLiteralIndex
{
final FixedIndexed<Integer> localDictionary = localDictionarySupplier.get();
final Indexed<ByteBuffer> stringDictionary = globalStringDictionarySupplier.get();
final FixedIndexed<Long> longDictionary = globalLongDictionarySupplier.get();
final FixedIndexed<Double> doubleDictionary = globalDoubleDictionarySupplier.get();
IntList getIndexes(@Nullable String value)
{
IntList intList = new IntArrayList();
if (value == null) {
intList.add(dictionary.indexOf(0));
intList.add(localDictionary.indexOf(0));
return intList;
}
// multi-type, return all that match
int globalId = globalDictionary.indexOf(StringUtils.toUtf8ByteBuffer(value));
int localId = dictionary.indexOf(globalId);
int globalId = stringDictionary.indexOf(StringUtils.toUtf8ByteBuffer(value));
int localId = localDictionary.indexOf(globalId);
if (localId >= 0) {
intList.add(localId);
}
Long someLong = GuavaUtils.tryParseLong(value);
if (someLong != null) {
globalId = globalLongDictionary.indexOf(someLong);
localId = dictionary.indexOf(globalId + adjustLongId);
globalId = longDictionary.indexOf(someLong);
localId = localDictionary.indexOf(globalId + adjustLongId);
if (localId >= 0) {
intList.add(localId);
}
@ -923,8 +971,8 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
Double someDouble = Doubles.tryParse(value);
if (someDouble != null) {
globalId = globalDoubleDictionary.indexOf(someDouble);
localId = dictionary.indexOf(globalId + adjustDoubleId);
globalId = doubleDictionary.indexOf(someDouble);
localId = localDictionary.indexOf(globalId + adjustDoubleId);
if (localId >= 0) {
intList.add(localId);
}
@ -1032,7 +1080,7 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
final DruidDoublePredicate doublePredicate = matcherFactory.makeDoublePredicate();
// in the future, this could use an int iterator
final Iterator<Integer> iterator = dictionary.iterator();
final Iterator<Integer> iterator = localDictionary.iterator();
int next;
int index;
boolean nextSet = false;
@ -1064,11 +1112,11 @@ public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Ind
while (!nextSet && iterator.hasNext()) {
Integer nextValue = iterator.next();
if (nextValue >= adjustDoubleId) {
nextSet = doublePredicate.applyDouble(globalDoubleDictionary.get(nextValue - adjustDoubleId));
nextSet = doublePredicate.applyDouble(doubleDictionary.get(nextValue - adjustDoubleId));
} else if (nextValue >= adjustLongId) {
nextSet = longPredicate.applyLong(globalLongDictionary.get(nextValue - adjustLongId));
nextSet = longPredicate.applyLong(longDictionary.get(nextValue - adjustLongId));
} else {
nextSet = stringPredicate.apply(StringUtils.fromUtf8Nullable(globalDictionary.get(nextValue)));
nextSet = stringPredicate.apply(StringUtils.fromUtf8Nullable(stringDictionary.get(nextValue)));
}
if (nextSet) {
next = index;

View File

@ -68,7 +68,8 @@ public class FixedIndexedTest extends InitializedNullHandlingTest
{
ByteBuffer buffer = ByteBuffer.allocate(1 << 14);
fillBuffer(buffer, order, false);
FixedIndexed<Long> fixedIndexed = FixedIndexed.read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES);
FixedIndexed<Long> fixedIndexed =
FixedIndexed.<Long>read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES).get();
Assert.assertEquals(64, fixedIndexed.size());
for (int i = 0; i < LONGS.length; i++) {
Assert.assertEquals(LONGS[i], fixedIndexed.get(i));
@ -81,7 +82,8 @@ public class FixedIndexedTest extends InitializedNullHandlingTest
{
ByteBuffer buffer = ByteBuffer.allocate(1 << 14);
fillBuffer(buffer, order, false);
FixedIndexed<Long> fixedIndexed = FixedIndexed.read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES);
FixedIndexed<Long> fixedIndexed =
FixedIndexed.<Long>read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES).get();
Iterator<Long> iterator = fixedIndexed.iterator();
int i = 0;
while (iterator.hasNext()) {
@ -94,7 +96,8 @@ public class FixedIndexedTest extends InitializedNullHandlingTest
{
ByteBuffer buffer = ByteBuffer.allocate(1 << 14);
fillBuffer(buffer, order, true);
FixedIndexed<Long> fixedIndexed = FixedIndexed.read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES);
FixedIndexed<Long> fixedIndexed =
FixedIndexed.<Long>read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES).get();
Assert.assertEquals(65, fixedIndexed.size());
Assert.assertNull(fixedIndexed.get(0));
for (int i = 0; i < LONGS.length; i++) {
@ -108,7 +111,8 @@ public class FixedIndexedTest extends InitializedNullHandlingTest
{
ByteBuffer buffer = ByteBuffer.allocate(1 << 14);
fillBuffer(buffer, order, true);
FixedIndexed<Long> fixedIndexed = FixedIndexed.read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES);
FixedIndexed<Long> fixedIndexed =
FixedIndexed.<Long>read(buffer, ColumnType.LONG.getStrategy(), order, Long.BYTES).get();
Iterator<Long> iterator = fixedIndexed.iterator();
Assert.assertNull(iterator.next());
int i = 0;

View File

@ -0,0 +1,356 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.nested;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.query.DefaultBitmapResultFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseProgressIndicator;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.NestedDataColumnIndexer;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.NullValueIndex;
import org.apache.druid.segment.column.StringValueSetIndex;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
public class NestedDataColumnSupplierTest extends InitializedNullHandlingTest
{
private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
DefaultBitmapResultFactory resultFactory = new DefaultBitmapResultFactory(new RoaringBitmapFactory());
List<Map<String, Object>> data = ImmutableList.of(
ImmutableMap.of("x", 1L, "y", 1.0, "z", "a"),
ImmutableMap.of("y", 3.0, "z", "d"),
ImmutableMap.of("x", 5L, "y", 5.0, "z", "b"),
ImmutableMap.of("x", 3L, "y", 4.0, "z", "c"),
ImmutableMap.of("x", 2L),
ImmutableMap.of("x", 4L, "y", 2.0, "z", "e")
);
Closer closer = Closer.create();
SmooshedFileMapper fileMapper;
ByteBuffer baseBuffer;
@Before
public void setup() throws IOException
{
final String fileNameBase = "test";
TmpFileSegmentWriteOutMediumFactory writeOutMediumFactory = TmpFileSegmentWriteOutMediumFactory.instance();
final File tmpFile = tempFolder.newFolder();
try (final FileSmoosher smoosher = new FileSmoosher(tmpFile)) {
NestedDataColumnSerializer serializer = new NestedDataColumnSerializer(
fileNameBase,
new IndexSpec(),
writeOutMediumFactory.makeSegmentWriteOutMedium(tempFolder.newFolder()),
new BaseProgressIndicator(),
closer
);
NestedDataColumnIndexer indexer = new NestedDataColumnIndexer();
for (Object o : data) {
indexer.processRowValsToUnsortedEncodedKeyComponent(o, false);
}
SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> sortedFields = new TreeMap<>();
indexer.mergeFields(sortedFields);
GlobalDictionarySortedCollector globalDictionarySortedCollector = indexer.getSortedCollector();
serializer.open();
serializer.serializeFields(sortedFields);
serializer.serializeStringDictionary(globalDictionarySortedCollector.getSortedStrings());
serializer.serializeLongDictionary(globalDictionarySortedCollector.getSortedLongs());
serializer.serializeDoubleDictionary(globalDictionarySortedCollector.getSortedDoubles());
SettableSelector valueSelector = new SettableSelector();
for (Object o : data) {
valueSelector.setObject(StructuredData.wrap(o));
serializer.serialize(valueSelector);
}
try (SmooshedWriter writer = smoosher.addWithSmooshedWriter(fileNameBase, serializer.getSerializedSize())) {
serializer.writeTo(writer, smoosher);
}
smoosher.close();
fileMapper = closer.register(SmooshedFileMapper.load(tmpFile));
baseBuffer = fileMapper.mapFile(fileNameBase);
}
}
@After
public void teardown() throws IOException
{
closer.close();
}
@Test
public void testBasicFunctionality() throws IOException
{
ColumnBuilder bob = new ColumnBuilder();
bob.setFileMapper(fileMapper);
NestedDataColumnSupplier supplier = new NestedDataColumnSupplier(
baseBuffer,
bob,
() -> 0,
NestedDataComplexTypeSerde.OBJECT_MAPPER,
new OnlyPositionalReadsTypeStrategy<>(ColumnType.LONG.getStrategy()),
new OnlyPositionalReadsTypeStrategy<>(ColumnType.DOUBLE.getStrategy())
);
try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) {
smokeTest(column);
}
}
@Test
public void testConcurrency() throws ExecutionException, InterruptedException
{
// if this test ever starts being to be a flake, there might be thread safety issues
ColumnBuilder bob = new ColumnBuilder();
bob.setFileMapper(fileMapper);
NestedDataColumnSupplier supplier = new NestedDataColumnSupplier(
baseBuffer,
bob,
() -> 0,
NestedDataComplexTypeSerde.OBJECT_MAPPER
);
final String expectedReason = "none";
final AtomicReference<String> failureReason = new AtomicReference<>(expectedReason);
final int threads = 10;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threads));
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) {
smokeTest(column);
}
}
}
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
private void smokeTest(NestedDataComplexColumn column) throws IOException
{
SimpleAscendingOffset offset = new SimpleAscendingOffset(data.size());
ColumnValueSelector<?> rawSelector = column.makeColumnValueSelector(offset);
final List<NestedPathPart> xPath = NestedPathFinder.parseJsonPath("$.x");
ColumnValueSelector<?> xSelector = column.makeColumnValueSelector(xPath, offset);
ColumnIndexSupplier xIndexSupplier = column.getColumnIndexSupplier(xPath);
Assert.assertNotNull(xIndexSupplier);
StringValueSetIndex xValueIndex = xIndexSupplier.as(StringValueSetIndex.class);
NullValueIndex xNulls = xIndexSupplier.as(NullValueIndex.class);
final List<NestedPathPart> yPath = NestedPathFinder.parseJsonPath("$.y");
ColumnValueSelector<?> ySelector = column.makeColumnValueSelector(yPath, offset);
ColumnIndexSupplier yIndexSupplier = column.getColumnIndexSupplier(yPath);
Assert.assertNotNull(yIndexSupplier);
StringValueSetIndex yValueIndex = yIndexSupplier.as(StringValueSetIndex.class);
NullValueIndex yNulls = yIndexSupplier.as(NullValueIndex.class);
final List<NestedPathPart> zPath = NestedPathFinder.parseJsonPath("$.z");
ColumnValueSelector<?> zSelector = column.makeColumnValueSelector(zPath, offset);
ColumnIndexSupplier zIndexSupplier = column.getColumnIndexSupplier(zPath);
Assert.assertNotNull(zIndexSupplier);
StringValueSetIndex zValueIndex = zIndexSupplier.as(StringValueSetIndex.class);
NullValueIndex zNulls = zIndexSupplier.as(NullValueIndex.class);
for (int i = 0; i < data.size(); i++) {
Map row = data.get(i);
Assert.assertEquals(
JSON_MAPPER.writeValueAsString(row),
JSON_MAPPER.writeValueAsString(StructuredData.unwrap(rawSelector.getObject()))
);
if (row.containsKey("x")) {
Assert.assertEquals(row.get("x"), xSelector.getObject());
Assert.assertEquals(row.get("x"), xSelector.getLong());
Assert.assertTrue(xValueIndex.forValue(String.valueOf(row.get("x"))).computeBitmapResult(resultFactory).get(i));
Assert.assertFalse(xNulls.forNull().computeBitmapResult(resultFactory).get(i));
} else {
Assert.assertNull(xSelector.getObject());
Assert.assertTrue(xSelector.isNull());
Assert.assertTrue(xValueIndex.forValue(null).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(xNulls.forNull().computeBitmapResult(resultFactory).get(i));
}
if (row.containsKey("y")) {
Assert.assertEquals(row.get("y"), ySelector.getObject());
Assert.assertEquals(row.get("y"), ySelector.getDouble());
Assert.assertTrue(yValueIndex.forValue(String.valueOf(row.get("y"))).computeBitmapResult(resultFactory).get(i));
Assert.assertFalse(yNulls.forNull().computeBitmapResult(resultFactory).get(i));
} else {
Assert.assertNull(ySelector.getObject());
Assert.assertTrue(ySelector.isNull());
Assert.assertTrue(yValueIndex.forValue(null).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(yNulls.forNull().computeBitmapResult(resultFactory).get(i));
}
if (row.containsKey("z")) {
Assert.assertEquals(row.get("z"), zSelector.getObject());
Assert.assertTrue(zValueIndex.forValue((String) row.get("z")).computeBitmapResult(resultFactory).get(i));
Assert.assertFalse(zNulls.forNull().computeBitmapResult(resultFactory).get(i));
} else {
Assert.assertNull(zSelector.getObject());
Assert.assertTrue(zValueIndex.forValue(null).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(zNulls.forNull().computeBitmapResult(resultFactory).get(i));
}
offset.increment();
}
}
private static class SettableSelector extends ObjectColumnSelector<StructuredData>
{
private StructuredData data;
public void setObject(StructuredData o)
{
this.data = o;
}
@Nullable
@Override
public StructuredData getObject()
{
return data;
}
@Override
public Class classOfObject()
{
return StructuredData.class;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
private static class OnlyPositionalReadsTypeStrategy<T> implements TypeStrategy<T>
{
private final TypeStrategy<T> delegate;
private OnlyPositionalReadsTypeStrategy(TypeStrategy<T> delegate)
{
this.delegate = delegate;
}
@Override
public int estimateSizeBytes(T value)
{
return delegate.estimateSizeBytes(value);
}
@Override
public T read(ByteBuffer buffer)
{
throw new IllegalStateException("non-positional read");
}
@Override
public boolean readRetainsBufferReference()
{
return delegate.readRetainsBufferReference();
}
@Override
public int write(ByteBuffer buffer, T value, int maxSizeBytes)
{
return delegate.write(buffer, value, maxSizeBytes);
}
@Override
public T read(ByteBuffer buffer, int offset)
{
return delegate.read(buffer, offset);
}
@Override
public int write(ByteBuffer buffer, int offset, T value, int maxSizeBytes)
{
return delegate.write(buffer, offset, value, maxSizeBytes);
}
@Override
public int compare(T o1, T o2)
{
return delegate.compare(o1, o2);
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.nested;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
@ -63,9 +64,9 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
BitmapResultFactory<ImmutableBitmap> bitmapResultFactory = new DefaultBitmapResultFactory(
roaringFactory.getBitmapFactory()
);
Indexed<ByteBuffer> globalStrings;
FixedIndexed<Long> globalLongs;
FixedIndexed<Double> globalDoubles;
Supplier<Indexed<ByteBuffer>> globalStrings;
Supplier<FixedIndexed<Long>> globalLongs;
Supplier<FixedIndexed<Double>> globalDoubles;
@Before
public void setup() throws IOException
@ -124,7 +125,8 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
doubleWriter.write(9.9);
writeToBuffer(doubleBuffer, doubleWriter);
globalStrings = GenericIndexed.read(stringBuffer, GenericIndexed.BYTE_BUFFER_STRATEGY);
GenericIndexed<ByteBuffer> strings = GenericIndexed.read(stringBuffer, GenericIndexed.BYTE_BUFFER_STRATEGY);
globalStrings = () -> strings.singleThreaded();
globalLongs = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES);
globalDoubles = FixedIndexed.read(doubleBuffer, TypeStrategies.DOUBLE, ByteOrder.nativeOrder(), Double.BYTES);
}
@ -1021,7 +1023,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
writeToBuffer(localDictionaryBuffer, localDictionaryWriter);
writeToBuffer(bitmapsBuffer, bitmapWriter);
FixedIndexed<Integer> dictionary = FixedIndexed.read(
Supplier<FixedIndexed<Integer>> dictionarySupplier = FixedIndexed.read(
localDictionaryBuffer,
NestedDataColumnSerializer.INT_TYPE_STRATEGY,
ByteOrder.nativeOrder(),
@ -1036,7 +1038,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
),
roaringFactory.getBitmapFactory(),
bitmaps,
dictionary,
dictionarySupplier,
globalStrings,
globalLongs,
globalDoubles
@ -1095,7 +1097,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
writeToBuffer(localDictionaryBuffer, localDictionaryWriter);
writeToBuffer(bitmapsBuffer, bitmapWriter);
FixedIndexed<Integer> dictionary = FixedIndexed.read(
Supplier<FixedIndexed<Integer>> dictionarySupplier = FixedIndexed.read(
localDictionaryBuffer,
NestedDataColumnSerializer.INT_TYPE_STRATEGY,
ByteOrder.nativeOrder(),
@ -1110,7 +1112,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
),
roaringFactory.getBitmapFactory(),
bitmaps,
dictionary,
dictionarySupplier,
globalStrings,
globalLongs,
globalDoubles
@ -1166,7 +1168,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
writeToBuffer(localDictionaryBuffer, localDictionaryWriter);
writeToBuffer(bitmapsBuffer, bitmapWriter);
FixedIndexed<Integer> dictionary = FixedIndexed.read(
Supplier<FixedIndexed<Integer>> dictionarySupplier = FixedIndexed.read(
localDictionaryBuffer,
NestedDataColumnSerializer.INT_TYPE_STRATEGY,
ByteOrder.nativeOrder(),
@ -1181,7 +1183,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
),
roaringFactory.getBitmapFactory(),
bitmaps,
dictionary,
dictionarySupplier,
globalStrings,
globalLongs,
globalDoubles
@ -1241,7 +1243,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
writeToBuffer(localDictionaryBuffer, localDictionaryWriter);
writeToBuffer(bitmapsBuffer, bitmapWriter);
FixedIndexed<Integer> dictionary = FixedIndexed.read(
Supplier<FixedIndexed<Integer>> dictionarySupplier = FixedIndexed.read(
localDictionaryBuffer,
NestedDataColumnSerializer.INT_TYPE_STRATEGY,
ByteOrder.nativeOrder(),
@ -1256,7 +1258,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
),
roaringFactory.getBitmapFactory(),
bitmaps,
dictionary,
dictionarySupplier,
globalStrings,
globalLongs,
globalDoubles
@ -1312,7 +1314,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
writeToBuffer(localDictionaryBuffer, localDictionaryWriter);
writeToBuffer(bitmapsBuffer, bitmapWriter);
FixedIndexed<Integer> dictionary = FixedIndexed.read(
Supplier<FixedIndexed<Integer>> dictionarySupplier = FixedIndexed.read(
localDictionaryBuffer,
NestedDataColumnSerializer.INT_TYPE_STRATEGY,
ByteOrder.nativeOrder(),
@ -1327,7 +1329,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
),
roaringFactory.getBitmapFactory(),
bitmaps,
dictionary,
dictionarySupplier,
globalStrings,
globalLongs,
globalDoubles
@ -1387,7 +1389,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
writeToBuffer(localDictionaryBuffer, localDictionaryWriter);
writeToBuffer(bitmapsBuffer, bitmapWriter);
FixedIndexed<Integer> dictionary = FixedIndexed.read(
Supplier<FixedIndexed<Integer>> dictionarySupplier = FixedIndexed.read(
localDictionaryBuffer,
NestedDataColumnSerializer.INT_TYPE_STRATEGY,
ByteOrder.nativeOrder(),
@ -1402,7 +1404,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
),
roaringFactory.getBitmapFactory(),
bitmaps,
dictionary,
dictionarySupplier,
globalStrings,
globalLongs,
globalDoubles
@ -1470,7 +1472,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
writeToBuffer(localDictionaryBuffer, localDictionaryWriter);
writeToBuffer(bitmapsBuffer, bitmapWriter);
FixedIndexed<Integer> dictionary = FixedIndexed.read(
Supplier<FixedIndexed<Integer>> dictionarySupplier = FixedIndexed.read(
localDictionaryBuffer,
NestedDataColumnSerializer.INT_TYPE_STRATEGY,
ByteOrder.nativeOrder(),
@ -1488,7 +1490,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
),
roaringFactory.getBitmapFactory(),
bitmaps,
dictionary,
dictionarySupplier,
globalStrings,
globalLongs,
globalDoubles