consolidate json and auto indexers, remove v4 nested column serializer (#14456)

This commit is contained in:
Clint Wylie 2023-08-22 18:50:11 -07:00 committed by GitHub
parent 6817de9376
commit fb053c399c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 72 additions and 2320 deletions

View File

@ -34,8 +34,8 @@ import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.StringEncodingStrategy;
import org.apache.druid.segment.data.FrontCodedIndexed;
@ -298,7 +298,7 @@ public class SqlNestedDataBenchmark
);
List<DimensionSchema> dims = ImmutableList.<DimensionSchema>builder()
.addAll(schemaInfo.getDimensionsSpec().getDimensions())
.add(new NestedDataDimensionSchema("nested"))
.add(new AutoTypeColumnSchema("nested"))
.build();
DimensionsSpec dimsSpec = new DimensionsSpec(dims);

View File

@ -27,12 +27,14 @@ import TabItem from '@theme/TabItem';
~ under the License.
-->
Apache Druid supports directly storing nested data structures in `COMPLEX<json>` columns. `COMPLEX<json>` columns store a copy of the structured data in JSON format and specialized internal columns and indexes for nested literal values&mdash;STRING, LONG, and DOUBLE types. An optimized [virtual column](./virtual-columns.md#nested-field-virtual-column) allows Druid to read and filter these values at speeds consistent with standard Druid LONG, DOUBLE, and STRING columns.
Apache Druid supports directly storing nested data structures in `COMPLEX<json>` columns. `COMPLEX<json>` columns store a copy of the structured data in JSON format and specialized internal columns and indexes for nested literal values&mdash;STRING, LONG, and DOUBLE types, as well as ARRAY of STRING, LONG, and DOUBLE values. An optimized [virtual column](./virtual-columns.md#nested-field-virtual-column) allows Druid to read and filter these values at speeds consistent with standard Druid LONG, DOUBLE, and STRING columns.
Druid [SQL JSON functions](./sql-json-functions.md) allow you to extract, transform, and create `COMPLEX<json>` values in SQL queries, using the specialized virtual columns where appropriate. You can use the [JSON nested columns functions](math-expr.md#json-functions) in [native queries](./querying.md) using [expression virtual columns](./virtual-columns.md#expression-virtual-column), and in native ingestion with a [`transformSpec`](../ingestion/ingestion-spec.md#transformspec).
You can use the JSON functions in INSERT and REPLACE statements in SQL-based ingestion, or in a `transformSpec` in native ingestion as an alternative to using a [`flattenSpec`](../ingestion/data-formats.md#flattenspec) object to "flatten" nested data for ingestion.
Columns ingested as `COMPLEX<json>` are automatically optimized to store the most appropriate physical column based on the data processed. For example, if only LONG values are processed, Druid stores a LONG column, ARRAY columns if the data consists of arrays, or `COMPLEX<json>` in the general case if the data is actually nested. This is the same functionality that powers ['type aware' schema discovery](../ingestion/schema-design.md#type-aware-schema-discovery).
Druid supports directly ingesting nested data with the following formats: JSON, Parquet, Avro, ORC, Protobuf.
## Example nested data

View File

@ -47,7 +47,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
@ -303,15 +303,15 @@ public class AvroStreamInputFormatTest extends InitializedNullHandlingTest
DimensionsSpec dimensionsSpec = new DimensionsSpec(
ImmutableList.of(
new NestedDataDimensionSchema("someIntValueMap"),
new NestedDataDimensionSchema("someStringValueMap"),
new NestedDataDimensionSchema("someRecord"),
new NestedDataDimensionSchema("someRecordArray"),
new AutoTypeColumnSchema("someIntValueMap"),
new AutoTypeColumnSchema("someStringValueMap"),
new AutoTypeColumnSchema("someRecord"),
new AutoTypeColumnSchema("someRecordArray"),
new LongDimensionSchema("tSomeIntValueMap8"),
new LongDimensionSchema("tSomeIntValueMap8_2"),
new StringDimensionSchema("tSomeStringValueMap8"),
new LongDimensionSchema("tSomeRecordSubLong"),
new NestedDataDimensionSchema("tSomeRecordArray0"),
new AutoTypeColumnSchema("tSomeRecordArray0"),
new StringDimensionSchema("tSomeRecordArray0nestedString")
)
);

View File

@ -37,7 +37,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.TransformingInputEntityReader;
@ -333,9 +333,9 @@ public class OrcReaderTest extends InitializedNullHandlingTest
new TimestampSpec("ts", "millis", null),
new DimensionsSpec(
ImmutableList.of(
new NestedDataDimensionSchema("middle"),
new NestedDataDimensionSchema("list"),
new NestedDataDimensionSchema("map")
new AutoTypeColumnSchema("middle"),
new AutoTypeColumnSchema("list"),
new AutoTypeColumnSchema("map")
)
),
inputFormat,
@ -542,8 +542,8 @@ public class OrcReaderTest extends InitializedNullHandlingTest
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(
ImmutableList.of(
new NestedDataDimensionSchema("a"),
new NestedDataDimensionSchema("b")
new AutoTypeColumnSchema("a"),
new AutoTypeColumnSchema("b")
)
),
inputFormat,
@ -608,11 +608,11 @@ public class OrcReaderTest extends InitializedNullHandlingTest
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(
ImmutableList.of(
new NestedDataDimensionSchema("a"),
new NestedDataDimensionSchema("b"),
new NestedDataDimensionSchema("c"),
new NestedDataDimensionSchema("d"),
new NestedDataDimensionSchema("t_d_0")
new AutoTypeColumnSchema("a"),
new AutoTypeColumnSchema("b"),
new AutoTypeColumnSchema("c"),
new AutoTypeColumnSchema("d"),
new AutoTypeColumnSchema("t_d_0")
)
),
inputFormat,

View File

@ -31,7 +31,7 @@ import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.TransformingInputEntityReader;
@ -51,8 +51,8 @@ public class NestedColumnParquetReaderTest extends BaseParquetReaderTest
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(
ImmutableList.of(
new NestedDataDimensionSchema("nestedData"),
new NestedDataDimensionSchema("t_nestedData_listDim"),
new AutoTypeColumnSchema("nestedData"),
new AutoTypeColumnSchema("t_nestedData_listDim"),
new StringDimensionSchema("t_nestedData_listDim_string"),
new StringDimensionSchema("t_nestedData_dim2"),
new LongDimensionSchema("t_nestedData_dim3"),
@ -105,10 +105,10 @@ public class NestedColumnParquetReaderTest extends BaseParquetReaderTest
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(
ImmutableList.of(
new NestedDataDimensionSchema("a1"),
new NestedDataDimensionSchema("a2"),
new NestedDataDimensionSchema("t_a2"),
new NestedDataDimensionSchema("t_a1_b1"),
new AutoTypeColumnSchema("a1"),
new AutoTypeColumnSchema("a2"),
new AutoTypeColumnSchema("t_a2"),
new AutoTypeColumnSchema("t_a1_b1"),
new LongDimensionSchema("t_a1_b1_c1"),
new LongDimensionSchema("t_e2_0_b1"),
new LongDimensionSchema("tt_a2_0_b1")

View File

@ -33,7 +33,6 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.DataSchema;
@ -257,9 +256,9 @@ public class InputSourceSamplerDiscoveryTest extends InitializedNullHandlingTest
new LongDimensionSchema("long"),
new DoubleDimensionSchema("double"),
new StringDimensionSchema("bool"),
new NestedDataDimensionSchema("variant"),
new NestedDataDimensionSchema("array"),
new NestedDataDimensionSchema("nested")
new AutoTypeColumnSchema("variant"),
new AutoTypeColumnSchema("array"),
new AutoTypeColumnSchema("nested")
)
).build(),
null,
@ -292,8 +291,8 @@ public class InputSourceSamplerDiscoveryTest extends InitializedNullHandlingTest
.add("long", ColumnType.LONG)
.add("double", ColumnType.DOUBLE)
.add("bool", ColumnType.STRING)
.add("variant", ColumnType.NESTED_DATA)
.add("array", ColumnType.NESTED_DATA)
.add("variant", ColumnType.STRING)
.add("array", ColumnType.LONG_ARRAY)
.add("nested", ColumnType.NESTED_DATA)
.build(),
response.getLogicalSegmentSchema()

View File

@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.indexing.SamplerResponse;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@ -65,7 +65,7 @@ public class SamplerResponseTest
new StringDimensionSchema("dim1")
),
ImmutableList.of(
new NestedDataDimensionSchema("dim1")
new AutoTypeColumnSchema("dim1")
),
RowSignature.builder().addTimeColumn().add("dim1", ColumnType.STRING).add("met1", ColumnType.LONG).build(),
data

View File

@ -32,7 +32,6 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;
@ -51,7 +50,7 @@ import java.util.Objects;
@JsonSubTypes.Type(name = DimensionSchema.FLOAT_TYPE_NAME, value = FloatDimensionSchema.class),
@JsonSubTypes.Type(name = DimensionSchema.DOUBLE_TYPE_NAME, value = DoubleDimensionSchema.class),
@JsonSubTypes.Type(name = DimensionSchema.SPATIAL_TYPE_NAME, value = NewSpatialDimensionSchema.class),
@JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = NestedDataDimensionSchema.class),
@JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = AutoTypeColumnSchema.class),
@JsonSubTypes.Type(name = AutoTypeColumnSchema.TYPE, value = AutoTypeColumnSchema.class)
})
public abstract class DimensionSchema

View File

@ -26,7 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.NestedDataDimensionHandler;
import org.apache.druid.segment.NestedCommonFormatColumnHandler;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.segment.nested.StructuredDataJsonSerializer;
@ -56,11 +56,10 @@ public class NestedDataModule implements DruidModule
{
if (ComplexMetrics.getSerdeForType(NestedDataComplexTypeSerde.TYPE_NAME) == null) {
ComplexMetrics.registerSerde(NestedDataComplexTypeSerde.TYPE_NAME, NestedDataComplexTypeSerde.INSTANCE);
}
DimensionHandlerUtils.registerDimensionHandlerProvider(
NestedDataComplexTypeSerde.TYPE_NAME,
NestedDataDimensionHandler::new
NestedCommonFormatColumnHandler::new
);
}
@ -68,9 +67,7 @@ public class NestedDataModule implements DruidModule
{
return Collections.singletonList(
new SimpleModule("NestedDataModule")
.registerSubtypes(
new NamedType(NestedFieldVirtualColumn.class, "nested-field")
)
.registerSubtypes(new NamedType(NestedFieldVirtualColumn.class, "nested-field"))
.addSerializer(StructuredData.class, new StructuredDataJsonSerializer())
);
}

View File

@ -66,8 +66,7 @@ import java.util.TreeMap;
*/
public class AutoTypeColumnMerger implements DimensionMergerV9
{
private static final Logger log = new Logger(NestedDataColumnMerger.class);
private static final Logger log = new Logger(AutoTypeColumnMerger.class);
public static final Comparator<PeekingIterator<String>> STRING_MERGING_COMPARATOR =
SimpleDictionaryMergingIterator.makePeekingComparator();
public static final Comparator<PeekingIterator<Long>> LONG_MERGING_COMPARATOR =

View File

@ -1,501 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnFormat;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexRowHolder;
import org.apache.druid.segment.nested.FieldTypeInfo;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.nested.NestedPathFinder;
import org.apache.druid.segment.nested.NestedPathPart;
import org.apache.druid.segment.nested.SortedValueDictionary;
import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.segment.nested.StructuredDataProcessor;
import org.apache.druid.segment.nested.ValueDictionary;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
public class NestedDataColumnIndexer implements DimensionIndexer<StructuredData, StructuredData, StructuredData>
{
private static final ColumnFormat FORMAT = new NestedDataComplexTypeSerde.NestedColumnFormatV4();
protected volatile boolean hasNulls = false;
protected SortedMap<String, FieldIndexer> fieldIndexers = new TreeMap<>();
protected final ValueDictionary globalDictionary = new ValueDictionary();
int estimatedFieldKeySize = 0;
protected final StructuredDataProcessor indexerProcessor = new StructuredDataProcessor()
{
@Override
public ProcessedValue<?> processField(ArrayList<NestedPathPart> fieldPath, @Nullable Object fieldValue)
{
// null value is always added to the global dictionary as id 0, so we can ignore them here
if (fieldValue != null) {
final String fieldName = NestedPathFinder.toNormalizedJsonPath(fieldPath);
ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue);
FieldIndexer fieldIndexer = fieldIndexers.get(fieldName);
if (fieldIndexer == null) {
estimatedFieldKeySize += StructuredDataProcessor.estimateStringSize(fieldName);
fieldIndexer = new FieldIndexer(globalDictionary);
fieldIndexers.put(fieldName, fieldIndexer);
}
return fieldIndexer.processValue(eval);
}
return ProcessedValue.NULL_LITERAL;
}
@Nullable
@Override
public ProcessedValue<?> processArrayField(
ArrayList<NestedPathPart> fieldPath,
@Nullable List<?> array
)
{
// classic nested data column indexer does not handle arrays
return null;
}
};
@Override
public EncodedKeyComponent<StructuredData> processRowValsToUnsortedEncodedKeyComponent(
@Nullable Object dimValues,
boolean reportParseExceptions
)
{
final long oldDictSizeInBytes = globalDictionary.sizeInBytes();
final int oldFieldKeySize = estimatedFieldKeySize;
final StructuredData data;
if (dimValues == null) {
hasNulls = true;
data = null;
} else if (dimValues instanceof StructuredData) {
data = (StructuredData) dimValues;
} else {
data = new StructuredData(dimValues);
}
StructuredDataProcessor.ProcessResults info = indexerProcessor.processFields(data == null ? null : data.getValue());
// 'raw' data is currently preserved 'as-is', and not replaced with object references to the global dictionaries
long effectiveSizeBytes = info.getEstimatedSize();
// then, we add the delta of size change to the global dictionaries to account for any new space added by the
// 'raw' data
effectiveSizeBytes += (globalDictionary.sizeInBytes() - oldDictSizeInBytes);
effectiveSizeBytes += (estimatedFieldKeySize - oldFieldKeySize);
return new EncodedKeyComponent<>(data, effectiveSizeBytes);
}
@Override
public void setSparseIndexed()
{
this.hasNulls = true;
}
@Override
public StructuredData getUnsortedEncodedValueFromSorted(StructuredData sortedIntermediateValue)
{
return sortedIntermediateValue;
}
@Override
public CloseableIndexed<StructuredData> getSortedIndexedValues()
{
throw new UnsupportedOperationException("Not supported");
}
@Override
public StructuredData getMinValue()
{
throw new UnsupportedOperationException("Not supported");
}
@Override
public StructuredData getMaxValue()
{
throw new UnsupportedOperationException("Not supported");
}
@Override
public int getCardinality()
{
return globalDictionary.getCardinality();
}
@Override
public DimensionSelector makeDimensionSelector(
DimensionSpec spec,
IncrementalIndexRowHolder currEntry,
IncrementalIndex.DimensionDesc desc
)
{
final int dimIndex = desc.getIndex();
final ColumnValueSelector<?> rootLiteralSelector = getRootLiteralValueSelector(currEntry, dimIndex);
if (rootLiteralSelector != null) {
return new BaseSingleValueDimensionSelector()
{
@Nullable
@Override
protected String getValue()
{
final Object o = rootLiteralSelector.getObject();
if (o == null) {
return null;
}
return o.toString();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
};
}
// column has nested data or is of mixed root type, cannot use
throw new UOE(
"makeDimensionSelector is not supported, column [%s] is [%s] typed and should only use makeColumnValueSelector",
spec.getOutputName(),
ColumnType.NESTED_DATA
);
}
@Override
public ColumnValueSelector<?> makeColumnValueSelector(
IncrementalIndexRowHolder currEntry,
IncrementalIndex.DimensionDesc desc
)
{
final int dimIndex = desc.getIndex();
final ColumnValueSelector<?> rootLiteralSelector = getRootLiteralValueSelector(currEntry, dimIndex);
if (rootLiteralSelector != null) {
return rootLiteralSelector;
}
return new ObjectColumnSelector<StructuredData>()
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
@Nullable
@Override
public StructuredData getObject()
{
final Object[] dims = currEntry.get().getDims();
if (0 <= dimIndex && dimIndex < dims.length) {
return (StructuredData) dims[dimIndex];
} else {
return null;
}
}
@Override
public Class<StructuredData> classOfObject()
{
return StructuredData.class;
}
};
}
private ColumnType getLogicalType()
{
if (fieldIndexers.size() == 1 && fieldIndexers.containsKey(NestedPathFinder.JSON_PATH_ROOT)) {
FieldIndexer rootField = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT);
ColumnType singleType = rootField.getTypes().getSingleType();
return singleType == null ? ColumnType.NESTED_DATA : singleType;
}
return ColumnType.NESTED_DATA;
}
@Override
public ColumnCapabilities getColumnCapabilities()
{
return ColumnCapabilitiesImpl.createDefault()
.setType(getLogicalType())
.setHasNulls(hasNulls);
}
@Override
public ColumnFormat getFormat()
{
return FORMAT;
}
public SortedValueDictionary getSortedValueLookups()
{
return globalDictionary.getSortedCollector();
}
public SortedMap<String, FieldTypeInfo.MutableTypeSet> getFieldTypeInfo()
{
TreeMap<String, FieldTypeInfo.MutableTypeSet> fields = new TreeMap<>();
for (Map.Entry<String, FieldIndexer> entry : fieldIndexers.entrySet()) {
// skip adding the field if no types are in the set, meaning only null values have been processed
if (!entry.getValue().getTypes().isEmpty()) {
fields.put(entry.getKey(), entry.getValue().getTypes());
}
}
return fields;
}
@Override
public int compareUnsortedEncodedKeyComponents(
@Nullable StructuredData lhs,
@Nullable StructuredData rhs
)
{
return StructuredData.COMPARATOR.compare(lhs, rhs);
}
@Override
public boolean checkUnsortedEncodedKeyComponentsEqual(
@Nullable StructuredData lhs,
@Nullable StructuredData rhs
)
{
return Objects.equals(lhs, rhs);
}
@Override
public int getUnsortedEncodedKeyComponentHashCode(@Nullable StructuredData key)
{
return Objects.hash(key);
}
@Override
public Object convertUnsortedEncodedKeyComponentToActualList(StructuredData key)
{
return key;
}
@Override
public ColumnValueSelector convertUnsortedValuesToSorted(ColumnValueSelector selectorWithUnsortedValues)
{
final FieldIndexer rootIndexer = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT);
if (fieldIndexers.size() == 1 && rootIndexer != null && rootIndexer.isSingleType()) {
// for root only literals, makeColumnValueSelector and makeDimensionSelector automatically unwrap StructuredData
// we need to do the opposite here, wrapping selector values with a StructuredData so that they are consistently
// typed for the merger
return new ColumnValueSelector<StructuredData>()
{
@Override
public boolean isNull()
{
return selectorWithUnsortedValues.isNull();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
selectorWithUnsortedValues.inspectRuntimeShape(inspector);
}
@Nullable
@Override
public StructuredData getObject()
{
return StructuredData.wrap(selectorWithUnsortedValues.getObject());
}
@Override
public float getFloat()
{
return selectorWithUnsortedValues.getFloat();
}
@Override
public double getDouble()
{
return selectorWithUnsortedValues.getDouble();
}
@Override
public long getLong()
{
return selectorWithUnsortedValues.getLong();
}
@Override
public Class<StructuredData> classOfObject()
{
return StructuredData.class;
}
};
}
return selectorWithUnsortedValues;
}
@Override
public void fillBitmapsFromUnsortedEncodedKeyComponent(
StructuredData key,
int rowNum,
MutableBitmap[] bitmapIndexes,
BitmapFactory factory
)
{
throw new UnsupportedOperationException("Not supported");
}
@Nullable
private ColumnValueSelector<?> getRootLiteralValueSelector(
IncrementalIndexRowHolder currEntry,
int dimIndex
)
{
if (fieldIndexers.size() > 1) {
return null;
}
final FieldIndexer root = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT);
if (root == null || !root.isSingleType()) {
return null;
}
return new ColumnValueSelector<Object>()
{
@Override
public boolean isNull()
{
final Object o = getObject();
return !(o instanceof Number);
}
@Override
public float getFloat()
{
Object value = getObject();
if (value == null) {
return 0;
}
return ((Number) value).floatValue();
}
@Override
public double getDouble()
{
Object value = getObject();
if (value == null) {
return 0;
}
return ((Number) value).doubleValue();
}
@Override
public long getLong()
{
Object value = getObject();
if (value == null) {
return 0;
}
return ((Number) value).longValue();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
@Nullable
@Override
public Object getObject()
{
final Object[] dims = currEntry.get().getDims();
if (0 <= dimIndex && dimIndex < dims.length) {
final StructuredData data = (StructuredData) dims[dimIndex];
if (data != null) {
return ExprEval.bestEffortOf(data.getValue()).valueOrDefault();
}
}
return null;
}
@Override
public Class<?> classOfObject()
{
return Object.class;
}
};
}
static class FieldIndexer
{
private final ValueDictionary valueDictionary;
private final FieldTypeInfo.MutableTypeSet typeSet;
FieldIndexer(ValueDictionary valueDictionary)
{
this.valueDictionary = valueDictionary;
this.typeSet = new FieldTypeInfo.MutableTypeSet();
}
private StructuredDataProcessor.ProcessedValue<?> processValue(ExprEval<?> eval)
{
final ColumnType columnType = ExpressionType.toColumnType(eval.type());
int sizeEstimate;
switch (columnType.getType()) {
case LONG:
typeSet.add(ColumnType.LONG);
sizeEstimate = valueDictionary.addLongValue(eval.asLong());
return new StructuredDataProcessor.ProcessedValue<>(eval.asLong(), sizeEstimate);
case DOUBLE:
typeSet.add(ColumnType.DOUBLE);
sizeEstimate = valueDictionary.addDoubleValue(eval.asDouble());
return new StructuredDataProcessor.ProcessedValue<>(eval.asDouble(), sizeEstimate);
case STRING:
typeSet.add(ColumnType.STRING);
final String asString = eval.asString();
sizeEstimate = valueDictionary.addStringValue(asString);
return new StructuredDataProcessor.ProcessedValue<>(asString, sizeEstimate);
default:
throw new IAE("Unhandled type: %s", columnType);
}
}
public FieldTypeInfo.MutableTypeSet getTypes()
{
return typeSet;
}
public boolean isSingleType()
{
return typeSet.getSingleType() != null;
}
}
}

View File

@ -1,216 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.collect.PeekingIterator;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnDescriptor;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.nested.FieldTypeInfo;
import org.apache.druid.segment.nested.NestedDataColumnSerializerV4;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.nested.SortedValueDictionary;
import org.apache.druid.segment.serde.ComplexColumnPartSerde;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.IntBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
public class NestedDataColumnMerger implements DimensionMergerV9
{
private static final Logger log = new Logger(NestedDataColumnMerger.class);
public static final Comparator<PeekingIterator<String>> STRING_MERGING_COMPARATOR =
SimpleDictionaryMergingIterator.makePeekingComparator();
public static final Comparator<PeekingIterator<Long>> LONG_MERGING_COMPARATOR =
SimpleDictionaryMergingIterator.makePeekingComparator();
public static final Comparator<PeekingIterator<Double>> DOUBLE_MERGING_COMPARATOR =
SimpleDictionaryMergingIterator.makePeekingComparator();
private final String name;
private final IndexSpec indexSpec;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final Closer closer;
private ColumnDescriptor.Builder descriptorBuilder;
private GenericColumnSerializer<?> serializer;
public NestedDataColumnMerger(
String name,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
Closer closer
)
{
this.name = name;
this.indexSpec = indexSpec;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.closer = closer;
}
@Override
public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws IOException
{
try {
long dimStartTime = System.currentTimeMillis();
int numMergeIndex = 0;
SortedValueDictionary sortedLookup = null;
final Indexed[] sortedLookups = new Indexed[adapters.size()];
final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
final SortedMap<String, FieldTypeInfo.MutableTypeSet> mergedFields = new TreeMap<>();
for (int i = 0; i < adapters.size(); i++) {
final IndexableAdapter adapter = adapters.get(i);
final IndexableAdapter.NestedColumnMergable mergable = closer.register(
adapter.getNestedColumnMergeables(name)
);
if (mergable == null) {
continue;
}
final SortedValueDictionary dimValues = mergable.getValueDictionary();
boolean allNulls = dimValues == null || dimValues.allNull();
if (!allNulls) {
sortedLookup = dimValues;
mergable.mergeFieldsInto(mergedFields);
sortedLookups[i] = dimValues.getSortedStrings();
sortedLongLookups[i] = dimValues.getSortedLongs();
sortedDoubleLookups[i] = dimValues.getSortedDoubles();
numMergeIndex++;
}
}
descriptorBuilder = new ColumnDescriptor.Builder();
final NestedDataColumnSerializerV4 defaultSerializer = new NestedDataColumnSerializerV4(
name,
indexSpec,
segmentWriteOutMedium,
closer
);
serializer = defaultSerializer;
final ComplexColumnPartSerde partSerde = ComplexColumnPartSerde.serializerBuilder()
.withTypeName(NestedDataComplexTypeSerde.TYPE_NAME)
.withDelegate(serializer)
.build();
descriptorBuilder.setValueType(ValueType.COMPLEX)
.setHasMultipleValues(false)
.addSerde(partSerde);
defaultSerializer.open();
defaultSerializer.serializeFields(mergedFields);
int stringCardinality;
int longCardinality;
int doubleCardinality;
if (numMergeIndex == 1) {
defaultSerializer.serializeDictionaries(
sortedLookup.getSortedStrings(),
sortedLookup.getSortedLongs(),
sortedLookup.getSortedDoubles()
);
stringCardinality = sortedLookup.getStringCardinality();
longCardinality = sortedLookup.getLongCardinality();
doubleCardinality = sortedLookup.getDoubleCardinality();
} else {
final SimpleDictionaryMergingIterator<String> stringIterator = new SimpleDictionaryMergingIterator<>(
sortedLookups,
STRING_MERGING_COMPARATOR
);
final SimpleDictionaryMergingIterator<Long> longIterator = new SimpleDictionaryMergingIterator<>(
sortedLongLookups,
LONG_MERGING_COMPARATOR
);
final SimpleDictionaryMergingIterator<Double> doubleIterator = new SimpleDictionaryMergingIterator<>(
sortedDoubleLookups,
DOUBLE_MERGING_COMPARATOR
);
defaultSerializer.serializeDictionaries(
() -> stringIterator,
() -> longIterator,
() -> doubleIterator
);
stringCardinality = stringIterator.getCardinality();
longCardinality = longIterator.getCardinality();
doubleCardinality = doubleIterator.getCardinality();
}
log.debug(
"Completed dim[%s] conversions with string cardinality[%,d], long cardinality[%,d], double cardinality[%,d] in %,d millis.",
name,
stringCardinality,
longCardinality,
doubleCardinality,
System.currentTimeMillis() - dimStartTime
);
}
catch (IOException ioe) {
log.error(ioe, "Failed to merge dictionary for column [%s]", name);
throw ioe;
}
}
@Override
public ColumnValueSelector convertSortedSegmentRowValuesToMergedRowValues(
int segmentIndex,
ColumnValueSelector source
)
{
return source;
}
@Override
public void processMergedRow(ColumnValueSelector selector) throws IOException
{
serializer.serialize(selector);
}
@Override
public void writeIndexes(@Nullable List<IntBuffer> segmentRowNumConversions)
{
// fields write their own indexes
}
@Override
public boolean hasOnlyNulls()
{
return false;
}
@Override
public ColumnDescriptor makeColumnDescriptor()
{
return descriptorBuilder.build();
}
}

View File

@ -1,105 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.selector.settable.SettableObjectColumnValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.util.Comparator;
public class NestedDataDimensionHandler implements DimensionHandler<StructuredData, StructuredData, StructuredData>
{
private static Comparator<ColumnValueSelector> COMPARATOR = (s1, s2) ->
StructuredData.COMPARATOR.compare(
StructuredData.wrap(s1.getObject()),
StructuredData.wrap(s2.getObject())
);
private final String name;
public NestedDataDimensionHandler(String name)
{
this.name = name;
}
@Override
public String getDimensionName()
{
return name;
}
@Override
public DimensionSpec getDimensionSpec()
{
return new DefaultDimensionSpec(name, name, ColumnType.NESTED_DATA);
}
@Override
public DimensionSchema getDimensionSchema(ColumnCapabilities capabilities)
{
return new NestedDataDimensionSchema(name);
}
@Override
public DimensionIndexer<StructuredData, StructuredData, StructuredData> makeIndexer(boolean useMaxMemoryEstimates)
{
return new NestedDataColumnIndexer();
}
@Override
public DimensionMergerV9 makeMerger(
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
Closer closer
)
{
return new NestedDataColumnMerger(name, indexSpec, segmentWriteOutMedium, closer);
}
@Override
public int getLengthOfEncodedKeyComponent(StructuredData dimVals)
{
// this is called in one place, OnheapIncrementalIndex, where returning 0 here means the value is null
// so the actual value we return here doesn't matter. we should consider refactoring this to a boolean
return 1;
}
@Override
public Comparator<ColumnValueSelector> getEncodedValueSelectorComparator()
{
return COMPARATOR;
}
@Override
public SettableColumnValueSelector makeNewSettableEncodedValueSelector()
{
return new SettableObjectColumnValueSelector();
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
public class NestedDataDimensionSchema extends DimensionSchema
{
@JsonCreator
public NestedDataDimensionSchema(
@JsonProperty("name") String name
)
{
super(name, null, true);
}
@Override
public String getTypeName()
{
return NestedDataComplexTypeSerde.TYPE_NAME;
}
@Override
public ColumnType getColumnType()
{
return ColumnType.NESTED_DATA;
}
@Override
public DimensionHandler getDimensionHandler()
{
return new NestedDataDimensionHandler(getName());
}
}

View File

@ -27,7 +27,6 @@ import org.apache.druid.segment.DimensionIndexer;
import org.apache.druid.segment.IndexableAdapter;
import org.apache.druid.segment.IntIteratorUtils;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.NestedDataColumnIndexer;
import org.apache.druid.segment.TransformableRowIterator;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnFormat;
@ -151,17 +150,6 @@ public class IncrementalIndexAdapter implements IndexableAdapter
}
final DimensionIndexer indexer = accessor.dimensionDesc.getIndexer();
if (indexer instanceof NestedDataColumnIndexer) {
NestedDataColumnIndexer nestedDataColumnIndexer = (NestedDataColumnIndexer) indexer;
return new NestedColumnMergable(
nestedDataColumnIndexer.getSortedValueLookups(),
nestedDataColumnIndexer.getFieldTypeInfo(),
true,
false,
null
);
}
if (indexer instanceof AutoTypeColumnIndexer) {
AutoTypeColumnIndexer autoIndexer = (AutoTypeColumnIndexer) indexer;
return new NestedColumnMergable(

View File

@ -34,13 +34,11 @@ import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionIndexer;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.NestedDataColumnIndexer;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.filter.BooleanValueMatcher;
@ -209,12 +207,6 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
IncrementalIndex.DimensionDesc desc = index.getDimension(column);
// nested column indexer is a liar, and behaves like any type if it only processes unnested literals of a single type
// so keep it in the family so to speak
if (desc != null && desc.getIndexer() instanceof NestedDataColumnIndexer) {
return ColumnCapabilitiesImpl.createDefault().setType(ColumnType.NESTED_DATA);
}
// Different from index.getColumnCapabilities because, in a way, IncrementalIndex's string-typed dimensions
// are always potentially multi-valued at query time. (Missing / null values for a row can potentially be
// represented by an empty array; see StringDimensionIndexer.IndexerDimensionSelector's getRow method.)

View File

@ -32,8 +32,8 @@ import org.apache.druid.segment.data.FrontCodedIntArrayIndexedWriter;
import javax.annotation.Nullable;
/**
* Ingestion time dictionary identifier lookup, used by {@link NestedDataColumnSerializerV4} to build a global dictionary
* id to value mapping for the 'stacked' global value dictionaries.
* Ingestion time dictionary identifier lookup, used by {@link NestedCommonFormatColumnSerializer} to build a global
* dictionary id to value mapping for the 'stacked' global value dictionaries.
*/
public class DictionaryIdLookup
{

View File

@ -50,12 +50,12 @@ import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
/**
* Base class for writer of global dictionary encoded nested field columns for {@link NestedDataColumnSerializerV4} and
* {@link NestedDataColumnSerializer}. Nested columns are written in multiple passes. The first pass processes the
* 'raw' nested data with a {@link StructuredDataProcessor} which will call {@link #addValue(int, Object)} for writers
* of each field which is present. For this type of writer, this entails building a local dictionary
* ({@link #localDictionary})to map into to the global dictionary ({@link #globalDictionaryIdLookup}) and writes this
* unsorted localId to an intermediate integer column, {@link #intermediateValueWriter}.
* Base class for writer of global dictionary encoded nested field columns for {@link NestedDataColumnSerializer}.
* Nested columns are written in multiple passes. The first pass processes the 'raw' nested data with a
* {@link StructuredDataProcessor} which will call {@link #addValue(int, Object)} for writers of each field which is
* present. For this type of writer, this entails building a local dictionary ({@link #localDictionary}) to map into
* the global dictionary ({@link #globalDictionaryIdLookup}) and writes this unsorted localId to an intermediate
* integer column, {@link #intermediateValueWriter}.
* <p>
* When processing the 'raw' value column is complete, the {@link #writeTo(int, FileSmoosher)} method will sort the
* local ids and write them out to a local sorted dictionary, iterate over {@link #intermediateValueWriter} swapping
@ -292,7 +292,7 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
}
}
};
final String fieldFileName = NestedDataColumnSerializerV4.getInternalFileName(columnName, fieldName);
final String fieldFileName = NestedCommonFormatColumnSerializer.getInternalFileName(columnName, fieldName);
final long size = fieldSerializer.getSerializedSize();
log.debug("Column [%s] serializing [%s] field of size [%d].", columnName, fieldName, size);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(fieldFileName, size)) {

View File

@ -46,10 +46,6 @@ import java.util.SortedMap;
*
* @see NestedDataColumnSerializer - nested columns
*
* @see NestedDataColumnSerializerV4 - legacy nested column format created by
* {@link org.apache.druid.segment.NestedDataColumnIndexer} and
* {@link org.apache.druid.segment.NestedDataColumnMerger}
*
*/
public abstract class NestedCommonFormatColumnSerializer implements GenericColumnSerializer<StructuredData>
{

View File

@ -1,399 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.nested;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.StringEncodingStrategies;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.ByteBufferWriter;
import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSerializer;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.DictionaryWriter;
import org.apache.druid.segment.data.FixedIndexedWriter;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
public class NestedDataColumnSerializerV4 implements GenericColumnSerializer<StructuredData>
{
private static final Logger log = new Logger(NestedDataColumnSerializerV4.class);
public static final String STRING_DICTIONARY_FILE_NAME = "__stringDictionary";
public static final String LONG_DICTIONARY_FILE_NAME = "__longDictionary";
public static final String DOUBLE_DICTIONARY_FILE_NAME = "__doubleDictionary";
public static final String ARRAY_DICTIONARY_FILE_NAME = "__arrayDictionary";
public static final String RAW_FILE_NAME = "__raw";
public static final String NULL_BITMAP_FILE_NAME = "__nullIndex";
public static final String NESTED_FIELD_PREFIX = "__field_";
private final String name;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final IndexSpec indexSpec;
@SuppressWarnings("unused")
private final Closer closer;
private final StructuredDataProcessor fieldProcessor = new StructuredDataProcessor()
{
@Override
public ProcessedValue<?> processField(ArrayList<NestedPathPart> fieldPath, @Nullable Object fieldValue)
{
final GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.get(
NestedPathFinder.toNormalizedJsonPath(fieldPath)
);
if (writer != null) {
try {
final ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue);
if (eval.type().isPrimitive() || eval.type().isPrimitiveArray()) {
writer.addValue(rowCount, eval.value());
} else {
// behave consistently with nested column indexer, which defaults to string
writer.addValue(rowCount, eval.asString());
}
// serializer doesn't use size estimate
return ProcessedValue.NULL_LITERAL;
}
catch (IOException e) {
throw new RE(e, "Failed to write field [%s], unhandled value", fieldPath);
}
}
return ProcessedValue.NULL_LITERAL;
}
@Nullable
@Override
public ProcessedValue<?> processArrayField(
ArrayList<NestedPathPart> fieldPath,
@Nullable List<?> array
)
{
// classic nested column ingestion does not support array fields
return null;
}
};
private byte[] metadataBytes;
private DictionaryIdLookup globalDictionaryIdLookup;
private SortedMap<String, FieldTypeInfo.MutableTypeSet> fields;
private GenericIndexedWriter<String> fieldsWriter;
private FieldTypeInfo.Writer fieldsInfoWriter;
private DictionaryWriter<String> dictionaryWriter;
private FixedIndexedWriter<Long> longDictionaryWriter;
private FixedIndexedWriter<Double> doubleDictionaryWriter;
private CompressedVariableSizedBlobColumnSerializer rawWriter;
private ByteBufferWriter<ImmutableBitmap> nullBitmapWriter;
private MutableBitmap nullRowsBitmap;
private Map<String, GlobalDictionaryEncodedFieldColumnWriter<?>> fieldWriters;
private int rowCount = 0;
private boolean closedForWrite = false;
private boolean dictionarySerialized = false;
public NestedDataColumnSerializerV4(
String name,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
Closer closer
)
{
this.name = name;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.indexSpec = indexSpec;
this.closer = closer;
this.globalDictionaryIdLookup = new DictionaryIdLookup();
}
@Override
public void open() throws IOException
{
fieldsWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, name, GenericIndexed.STRING_STRATEGY);
fieldsWriter.open();
fieldsInfoWriter = new FieldTypeInfo.Writer(segmentWriteOutMedium);
fieldsInfoWriter.open();
dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter(
indexSpec.getStringDictionaryEncoding(),
segmentWriteOutMedium,
name
);
dictionaryWriter.open();
longDictionaryWriter = new FixedIndexedWriter<>(
segmentWriteOutMedium,
ColumnType.LONG.getStrategy(),
ByteOrder.nativeOrder(),
Long.BYTES,
true
);
longDictionaryWriter.open();
doubleDictionaryWriter = new FixedIndexedWriter<>(
segmentWriteOutMedium,
ColumnType.DOUBLE.getStrategy(),
ByteOrder.nativeOrder(),
Double.BYTES,
true
);
doubleDictionaryWriter.open();
rawWriter = new CompressedVariableSizedBlobColumnSerializer(
getInternalFileName(name, RAW_FILE_NAME),
segmentWriteOutMedium,
indexSpec.getJsonCompression() != null ? indexSpec.getJsonCompression() : CompressionStrategy.LZ4
);
rawWriter.open();
nullBitmapWriter = new ByteBufferWriter<>(
segmentWriteOutMedium,
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
);
nullBitmapWriter.open();
nullRowsBitmap = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
}
public void serializeFields(SortedMap<String, FieldTypeInfo.MutableTypeSet> fields) throws IOException
{
this.fields = fields;
this.fieldWriters = Maps.newHashMapWithExpectedSize(fields.size());
int ctr = 0;
for (Map.Entry<String, FieldTypeInfo.MutableTypeSet> field : fields.entrySet()) {
final String fieldName = field.getKey();
final String fieldFileName = NESTED_FIELD_PREFIX + ctr++;
fieldsWriter.write(fieldName);
fieldsInfoWriter.write(field.getValue());
final GlobalDictionaryEncodedFieldColumnWriter<?> writer;
final ColumnType type = field.getValue().getSingleType();
if (type != null) {
if (Types.is(type, ValueType.STRING)) {
writer = new ScalarStringFieldColumnWriter(
name,
fieldFileName,
segmentWriteOutMedium,
indexSpec,
globalDictionaryIdLookup
);
} else if (Types.is(type, ValueType.LONG)) {
writer = new ScalarLongFieldColumnWriter(
name,
fieldFileName,
segmentWriteOutMedium,
indexSpec,
globalDictionaryIdLookup
);
} else if (Types.is(type, ValueType.DOUBLE)) {
writer = new ScalarDoubleFieldColumnWriter(
name,
fieldFileName,
segmentWriteOutMedium,
indexSpec,
globalDictionaryIdLookup
);
} else {
throw new ISE("Invalid field type [%s], how did this happen?", type);
}
} else {
writer = new VariantFieldColumnWriter(
name,
fieldFileName,
segmentWriteOutMedium,
indexSpec,
globalDictionaryIdLookup
);
}
writer.open();
fieldWriters.put(fieldName, writer);
}
}
public void serializeDictionaries(
Iterable<String> strings,
Iterable<Long> longs,
Iterable<Double> doubles
) throws IOException
{
if (dictionarySerialized) {
throw new ISE("String dictionary already serialized for column [%s], cannot serialize again", name);
}
// null is always 0
dictionaryWriter.write(null);
globalDictionaryIdLookup.addString(null);
for (String value : strings) {
value = NullHandling.emptyToNullIfNeeded(value);
if (value == null) {
continue;
}
dictionaryWriter.write(value);
globalDictionaryIdLookup.addString(value);
}
dictionarySerialized = true;
for (Long value : longs) {
if (value == null) {
continue;
}
longDictionaryWriter.write(value);
globalDictionaryIdLookup.addLong(value);
}
for (Double value : doubles) {
if (value == null) {
continue;
}
doubleDictionaryWriter.write(value);
globalDictionaryIdLookup.addDouble(value);
}
dictionarySerialized = true;
}
@Override
public void serialize(ColumnValueSelector<? extends StructuredData> selector) throws IOException
{
final StructuredData data = StructuredData.wrap(selector.getObject());
if (data == null) {
nullRowsBitmap.add(rowCount);
}
rawWriter.addValue(NestedDataComplexTypeSerde.INSTANCE.toBytes(data));
if (data != null) {
fieldProcessor.processFields(data.getValue());
}
rowCount++;
}
@Override
public long getSerializedSize() throws IOException
{
if (!closedForWrite) {
closedForWrite = true;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IndexMerger.SERIALIZER_UTILS.writeString(
baos,
NestedDataComplexTypeSerde.OBJECT_MAPPER.writeValueAsString(
new NestedDataColumnMetadata(
ByteOrder.nativeOrder(),
indexSpec.getBitmapSerdeFactory(),
name,
!nullRowsBitmap.isEmpty()
)
)
);
this.metadataBytes = baos.toByteArray();
this.nullBitmapWriter.write(nullRowsBitmap);
}
long size = 1;
size += metadataBytes.length;
if (fieldsWriter != null) {
size += fieldsWriter.getSerializedSize();
}
if (fieldsInfoWriter != null) {
size += fieldsInfoWriter.getSerializedSize();
}
// the value dictionaries, raw column, and null index are all stored in separate files
return size;
}
@Override
public void writeTo(
WritableByteChannel channel,
FileSmoosher smoosher
) throws IOException
{
Preconditions.checkState(closedForWrite, "Not closed yet!");
Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not sorted?!?");
// version 5
channel.write(ByteBuffer.wrap(new byte[]{0x04}));
channel.write(ByteBuffer.wrap(metadataBytes));
fieldsWriter.writeTo(channel, smoosher);
fieldsInfoWriter.writeTo(channel, smoosher);
// version 3 stores large components in separate files to prevent exceeding smoosh file limit (int max)
writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME);
writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME);
writeInternal(smoosher, doubleDictionaryWriter, DOUBLE_DICTIONARY_FILE_NAME);
writeInternal(smoosher, rawWriter, RAW_FILE_NAME);
if (!nullRowsBitmap.isEmpty()) {
writeInternal(smoosher, nullBitmapWriter, NULL_BITMAP_FILE_NAME);
}
// close the SmooshedWriter since we are done here, so we don't write to a temporary file per sub-column
// In the future, it would be best if the writeTo() itself didn't take a channel but was expected to actually
// open its own channels on the FileSmoosher object itself. Or some other thing that give this Serializer
// total control over when resources are opened up and when they are closed. Until then, we are stuck
// with a very tight coupling of this code with how the external "driver" is working.
if (channel instanceof SmooshedWriter) {
channel.close();
}
for (Map.Entry<String, FieldTypeInfo.MutableTypeSet> field : fields.entrySet()) {
// remove writer so that it can be collected when we are done with it
GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.remove(field.getKey());
writer.writeTo(rowCount, smoosher);
}
log.info("Column [%s] serialized successfully with [%d] nested columns.", name, fields.size());
}
private void writeInternal(FileSmoosher smoosher, Serializer serializer, String fileName) throws IOException
{
final String internalName = getInternalFileName(name, fileName);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, serializer.getSerializedSize())) {
serializer.writeTo(smooshChannel, smoosher);
}
}
public static String getInternalFileName(String fileNameBase, String field)
{
return StringUtils.format("%s.%s", fileNameBase, field);
}
}

View File

@ -115,7 +115,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
final ByteBuffer stringDictionaryBuffer = loadInternalFile(
mapper,
metadata,
NestedDataColumnSerializerV4.STRING_DICTIONARY_FILE_NAME
NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
);
final int dictionaryStartPosition = stringDictionaryBuffer.position();
@ -149,7 +149,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
final ByteBuffer longDictionaryBuffer = loadInternalFile(
mapper,
metadata,
NestedDataColumnSerializerV4.LONG_DICTIONARY_FILE_NAME
NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME
);
longDictionarySupplier = FixedIndexed.read(
longDictionaryBuffer,
@ -160,7 +160,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
final ByteBuffer doubleDictionaryBuffer = loadInternalFile(
mapper,
metadata,
NestedDataColumnSerializerV4.DOUBLE_DICTIONARY_FILE_NAME
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME
);
doubleDictionarySupplier = FixedIndexed.read(
doubleDictionaryBuffer,
@ -172,7 +172,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
final ByteBuffer arrayDictionarybuffer = loadInternalFile(
mapper,
metadata,
NestedDataColumnSerializerV4.ARRAY_DICTIONARY_FILE_NAME
NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME
);
arrayDictionarySupplier = FrontCodedIntArrayIndexed.read(
arrayDictionarybuffer,
@ -181,10 +181,10 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
} else {
arrayDictionarySupplier = null;
}
final ByteBuffer rawBuffer = loadInternalFile(mapper, metadata, NestedDataColumnSerializerV4.RAW_FILE_NAME);
final ByteBuffer rawBuffer = loadInternalFile(mapper, metadata, NestedCommonFormatColumnSerializer.RAW_FILE_NAME);
compressedRawColumnSupplier = CompressedVariableSizedBlobColumnSupplier.fromByteBuffer(
NestedDataColumnSerializerV4.getInternalFileName(
metadata.getFileNameBase(), NestedDataColumnSerializerV4.RAW_FILE_NAME
NestedCommonFormatColumnSerializer.getInternalFileName(
metadata.getFileNameBase(), NestedCommonFormatColumnSerializer.RAW_FILE_NAME
),
rawBuffer,
metadata.getByteOrder(),
@ -195,7 +195,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
final ByteBuffer nullIndexBuffer = loadInternalFile(
mapper,
metadata,
NestedDataColumnSerializerV4.NULL_BITMAP_FILE_NAME
NestedCommonFormatColumnSerializer.NULL_BITMAP_FILE_NAME
);
nullValues = metadata.getBitmapSerdeFactory().getObjectStrategy().fromByteBufferWithSize(nullIndexBuffer);
} else {
@ -422,7 +422,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
) throws IOException
{
return fileMapper.mapFile(
NestedDataColumnSerializerV4.getInternalFileName(metadata.getFileNameBase(), internalFileName)
NestedCommonFormatColumnSerializer.getInternalFileName(metadata.getFileNameBase(), internalFileName)
);
}
}

View File

@ -81,9 +81,9 @@ public final class NestedDataColumnV4<TStringDictionary extends Indexed<ByteBuff
@Override
public String getFieldFileName(String fileNameBase, String field, int fieldIndex)
{
return NestedDataColumnSerializerV4.getInternalFileName(
return NestedCommonFormatColumnSerializer.getInternalFileName(
fileNameBase,
NestedDataColumnSerializerV4.NESTED_FIELD_PREFIX + fieldIndex
NestedCommonFormatColumnSerializer.NESTED_FIELD_PREFIX + fieldIndex
);
}

View File

@ -28,9 +28,9 @@ import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.NestedDataDimensionHandler;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.NestedCommonFormatColumnHandler;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
@ -168,13 +168,13 @@ public class NestedDataComplexTypeSerde extends ComplexMetricSerde
@Override
public DimensionHandler getColumnHandler(String columnName)
{
return new NestedDataDimensionHandler(columnName);
return new NestedCommonFormatColumnHandler(columnName);
}
@Override
public DimensionSchema getColumnSchema(String columnName)
{
return new NestedDataDimensionSchema(columnName);
return new AutoTypeColumnSchema(columnName);
}
@Override

View File

@ -33,7 +33,7 @@ import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
/**
* Nested field writer for double type columns of {@link NestedDataColumnSerializerV4}. In addition to the normal
* Nested field writer for double type columns of {@link NestedDataColumnSerializer}. In addition to the normal
* dictionary encoded column, this writer also writes an additional double value column with {@link #doublesSerializer},
* which is written to during {@link #addValue}.
*/

View File

@ -33,7 +33,7 @@ import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
/**
* Nested field writer for long type columns of {@link NestedDataColumnSerializerV4}. In addition to the normal
* Nested field writer for long type columns of {@link NestedDataColumnSerializer}. In addition to the normal
* dictionary encoded column, this writer also writes an additional long value column with {@link #longsSerializer},
* which is written to during {@link #addValue}.
*/

View File

@ -28,8 +28,7 @@ import java.io.IOException;
import java.nio.channels.WritableByteChannel;
/**
* Nested field writer for string type columns of {@link NestedDataColumnSerializerV4} and
* {@link NestedDataColumnSerializer}
* Nested field writer for string type columns of {@link NestedDataColumnSerializer}
*/
public final class ScalarStringFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<String>
{

View File

@ -26,7 +26,6 @@ import org.apache.druid.segment.AutoTypeColumnIndexer;
import org.apache.druid.segment.ComparatorDimensionDictionary;
import org.apache.druid.segment.ComparatorSortedDimensionDictionary;
import org.apache.druid.segment.DimensionDictionary;
import org.apache.druid.segment.NestedDataColumnIndexer;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.FrontCodedIntArrayIndexedWriter;
import org.apache.druid.segment.data.GenericIndexed;
@ -40,9 +39,8 @@ import java.util.Set;
import java.util.TreeSet;
/**
* Used by {@link AutoTypeColumnIndexer} and {@link NestedDataColumnIndexer} to build the
* value dictionary, which can be converted into a {@link SortedValueDictionary} to sort and write out the values to a
* segment with {@link #getSortedCollector()}.
* Used by {@link AutoTypeColumnIndexer} to build the value dictionary, which can be converted into a
* {@link SortedValueDictionary} to sort and write out the values to a segment with {@link #getSortedCollector()}.
*/
public class ValueDictionary
{

View File

@ -28,8 +28,7 @@ import java.io.IOException;
import java.nio.channels.WritableByteChannel;
/**
* Nested field writer for mixed type scalar or array columns of {@link NestedDataColumnSerializerV4} and
* {@link NestedDataColumnSerializer}.
* Nested field writer for mixed type scalar or array columns of {@link NestedDataColumnSerializer}.
*/
public final class VariantFieldColumnWriter extends GlobalDictionaryEncodedFieldColumnWriter<Object>
{

View File

@ -44,7 +44,6 @@ import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
@ -101,20 +100,6 @@ public class NestedDataTestUtils
.useSchemaDiscovery(true)
.build();
public static final DimensionsSpec TSV_V4_SCHEMA =
DimensionsSpec.builder()
.setDimensions(
Arrays.asList(
new NestedDataDimensionSchema("dim"),
new NestedDataDimensionSchema("nest_json"),
new NestedDataDimensionSchema("nester_json"),
new NestedDataDimensionSchema("variant_json"),
new NestedDataDimensionSchema("list_json"),
new NestedDataDimensionSchema("nonexistent")
)
)
.build();
public static final DimensionsSpec TSV_SCHEMA =
DimensionsSpec.builder()
.setDimensions(
@ -184,21 +169,6 @@ public class NestedDataTestUtils
);
}
public static List<Segment> createSimpleSegmentsTsvV4(
TemporaryFolder tempFolder,
Closer closer
)
throws Exception
{
return createSimpleNestedTestDataTsvSegments(
tempFolder,
closer,
Granularities.NONE,
TSV_V4_SCHEMA,
true
);
}
public static List<Segment> createSimpleNestedTestDataTsvSegments(
TemporaryFolder tempFolder,
Closer closer,

View File

@ -265,35 +265,6 @@ public class NestedDataScanQueryTest extends InitializedNullHandlingTest
Assert.assertEquals(resultsSegments.get(0).getEvents().toString(), resultsRealtime.get(0).getEvents().toString());
}
@Test
public void testIngestAndScanSegmentsTsvV4() throws Exception
{
Query<ScanResultValue> scanQuery = Druids.newScanQueryBuilder()
.dataSource("test_datasource")
.intervals(
new MultipleIntervalSegmentSpec(
Collections.singletonList(Intervals.ETERNITY)
)
)
.virtualColumns(
new NestedFieldVirtualColumn("nest", "$.x", "x"),
new NestedFieldVirtualColumn("nester", "$.x[0]", "x_0"),
new NestedFieldVirtualColumn("nester", "$.y.c[1]", "y_c_1")
)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.limit(100)
.context(ImmutableMap.of())
.build();
List<Segment> segs = NestedDataTestUtils.createSimpleSegmentsTsvV4(tempFolder, closer);
final Sequence<ScanResultValue> seq = helper.runQueryOnSegmentsObjs(segs, scanQuery);
List<ScanResultValue> results = seq.toList();
Assert.assertEquals(1, results.size());
Assert.assertEquals(8, ((List) results.get(0).getEvents()).size());
logResults(results);
}
@Test
public void testIngestAndScanSegmentsTsv() throws Exception
{

View File

@ -1,612 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Map;
public class NestedDataColumnIndexerTest extends InitializedNullHandlingTest
{
private static final String TIME_COL = "time";
private static final String STRING_COL = "string";
private static final String STRING_ARRAY_COL = "string_array";
private static final String LONG_COL = "long";
private static final String DOUBLE_COL = "double";
private static final String VARIANT_COL = "variant";
private static final String NESTED_COL = "nested";
@BeforeClass
public static void setup()
{
NestedDataModule.registerHandlersAndSerde();
}
@Test
public void testKeySizeEstimation()
{
NestedDataColumnIndexer indexer = new NestedDataColumnIndexer();
int baseCardinality = NullHandling.sqlCompatible() ? 0 : 2;
Assert.assertEquals(baseCardinality, indexer.getCardinality());
EncodedKeyComponent<StructuredData> key;
// new raw value, new field, new dictionary entry
key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", "foo"), false);
Assert.assertEquals(228, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 1, indexer.getCardinality());
// adding same value only adds estimated size of value itself
key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", "foo"), false);
Assert.assertEquals(112, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 1, indexer.getCardinality());
// new raw value, new field, new dictionary entry
key = indexer.processRowValsToUnsortedEncodedKeyComponent(10L, false);
Assert.assertEquals(94, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 2, indexer.getCardinality());
// adding same value only adds estimated size of value itself
key = indexer.processRowValsToUnsortedEncodedKeyComponent(10L, false);
Assert.assertEquals(16, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 2, indexer.getCardinality());
// new raw value, new dictionary entry
key = indexer.processRowValsToUnsortedEncodedKeyComponent(11L, false);
Assert.assertEquals(48, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 3, indexer.getCardinality());
// new raw value, new fields
key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableList.of(1L, 2L, 10L), false);
Assert.assertEquals(276, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 5, indexer.getCardinality());
// new raw value, re-use fields and dictionary
key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableList.of(1L, 2L, 10L), false);
Assert.assertEquals(56, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 5, indexer.getCardinality());
// new raw value, new fields
key = indexer.processRowValsToUnsortedEncodedKeyComponent(
ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)),
false
);
Assert.assertEquals(286, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 5, indexer.getCardinality());
// new raw value
key = indexer.processRowValsToUnsortedEncodedKeyComponent(
ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)),
false
);
Assert.assertEquals(118, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 5, indexer.getCardinality());
key = indexer.processRowValsToUnsortedEncodedKeyComponent("", false);
if (NullHandling.replaceWithDefault()) {
Assert.assertEquals(0, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 6, indexer.getCardinality());
} else {
Assert.assertEquals(104, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 6, indexer.getCardinality());
}
key = indexer.processRowValsToUnsortedEncodedKeyComponent(0, false);
if (NullHandling.replaceWithDefault()) {
Assert.assertEquals(16, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 6, indexer.getCardinality());
} else {
Assert.assertEquals(48, key.getEffectiveSizeBytes());
Assert.assertEquals(baseCardinality + 7, indexer.getCardinality());
}
}
@Test
public void testNestedColumnIndexerSchemaDiscoveryRootString() throws IndexSizeExceededException
{
long minTimestamp = System.currentTimeMillis();
IncrementalIndex index = makeIncrementalIndex(minTimestamp);
index.add(makeInputRow(minTimestamp + 1, true, STRING_COL, "a"));
index.add(makeInputRow(minTimestamp + 2, true, STRING_COL, "b"));
index.add(makeInputRow(minTimestamp + 3, true, STRING_COL, "c"));
index.add(makeInputRow(minTimestamp + 4, true, STRING_COL, null));
index.add(makeInputRow(minTimestamp + 5, false, STRING_COL, null));
IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index);
Sequence<Cursor> cursorSequence = storageAdapter.makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.NONE,
false,
null
);
final DimensionSpec dimensionSpec = new DefaultDimensionSpec(STRING_COL, STRING_COL, ColumnType.STRING);
List<Cursor> cursorList = cursorSequence.toList();
ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory();
ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL);
DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
Assert.assertEquals("a", valueSelector.getObject());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals("a", dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertEquals("a", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
Assert.assertEquals("b", valueSelector.getObject());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals("b", dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertEquals("b", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
Assert.assertEquals("c", valueSelector.getObject());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals("c", dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertEquals("c", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
Assert.assertNull(valueSelector.getObject());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertNull(dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
Assert.assertNull(valueSelector.getObject());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertNull(dimensionSelector.getObject());
}
@Test
public void testNestedColumnIndexerSchemaDiscoveryRootLong() throws IndexSizeExceededException
{
long minTimestamp = System.currentTimeMillis();
IncrementalIndex index = makeIncrementalIndex(minTimestamp);
index.add(makeInputRow(minTimestamp + 1, true, LONG_COL, 1L));
index.add(makeInputRow(minTimestamp + 2, true, LONG_COL, 2L));
index.add(makeInputRow(minTimestamp + 3, true, LONG_COL, 3L));
index.add(makeInputRow(minTimestamp + 4, true, LONG_COL, null));
index.add(makeInputRow(minTimestamp + 5, false, LONG_COL, null));
IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index);
Sequence<Cursor> cursorSequence = storageAdapter.makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.NONE,
false,
null
);
final DimensionSpec dimensionSpec = new DefaultDimensionSpec(LONG_COL, LONG_COL, ColumnType.LONG);
List<Cursor> cursorList = cursorSequence.toList();
ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory();
ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL);
DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
Assert.assertEquals(1L, valueSelector.getObject());
Assert.assertEquals(1L, valueSelector.getLong());
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals("1", dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertEquals("1", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
Assert.assertEquals(2L, valueSelector.getObject());
Assert.assertEquals(2L, valueSelector.getLong());
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals("2", dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertEquals("2", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
Assert.assertEquals(3L, valueSelector.getObject());
Assert.assertEquals(3L, valueSelector.getLong());
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals("3", dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertEquals("3", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
if (NullHandling.sqlCompatible()) {
Assert.assertNull(valueSelector.getObject());
Assert.assertTrue(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertNull(dimensionSelector.getObject());
} else {
Assert.assertEquals(NullHandling.defaultLongValue(), valueSelector.getObject());
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals(
String.valueOf(NullHandling.defaultLongValue()),
dimensionSelector.lookupName(dimensionSelector.getRow().get(0))
);
Assert.assertEquals(String.valueOf(NullHandling.defaultLongValue()), dimensionSelector.getObject());
}
columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
if (NullHandling.sqlCompatible()) {
Assert.assertNull(valueSelector.getObject());
Assert.assertTrue(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertNull(dimensionSelector.getObject());
} else {
Assert.assertEquals(NullHandling.defaultLongValue(), valueSelector.getObject());
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals(
String.valueOf(NullHandling.defaultLongValue()),
dimensionSelector.lookupName(dimensionSelector.getRow().get(0))
);
Assert.assertEquals(String.valueOf(NullHandling.defaultLongValue()), dimensionSelector.getObject());
}
}
@Test
public void testNestedColumnIndexerSchemaDiscoveryRootDouble() throws IndexSizeExceededException
{
long minTimestamp = System.currentTimeMillis();
IncrementalIndex index = makeIncrementalIndex(minTimestamp);
index.add(makeInputRow(minTimestamp + 1, true, DOUBLE_COL, 1.1));
index.add(makeInputRow(minTimestamp + 2, true, DOUBLE_COL, 2.2));
index.add(makeInputRow(minTimestamp + 3, true, DOUBLE_COL, 3.3));
index.add(makeInputRow(minTimestamp + 4, true, DOUBLE_COL, null));
index.add(makeInputRow(minTimestamp + 5, false, DOUBLE_COL, null));
IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index);
Sequence<Cursor> cursorSequence = storageAdapter.makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.NONE,
false,
null
);
final DimensionSpec dimensionSpec = new DefaultDimensionSpec(DOUBLE_COL, DOUBLE_COL, ColumnType.DOUBLE);
List<Cursor> cursorList = cursorSequence.toList();
ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory();
ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL);
DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
Assert.assertEquals(1.1, valueSelector.getObject());
Assert.assertEquals(1.1, valueSelector.getDouble(), 0.0);
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals("1.1", dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertEquals("1.1", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
Assert.assertEquals(2.2, valueSelector.getObject());
Assert.assertEquals(2.2, valueSelector.getDouble(), 0.0);
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals("2.2", dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertEquals("2.2", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
Assert.assertEquals(3.3, valueSelector.getObject());
Assert.assertEquals(3.3, valueSelector.getDouble(), 0.0);
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals("3.3", dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertEquals("3.3", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
if (NullHandling.sqlCompatible()) {
Assert.assertNull(valueSelector.getObject());
Assert.assertTrue(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertNull(dimensionSelector.getObject());
} else {
Assert.assertEquals(NullHandling.defaultDoubleValue(), valueSelector.getObject());
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals(
String.valueOf(NullHandling.defaultDoubleValue()),
dimensionSelector.lookupName(dimensionSelector.getRow().get(0))
);
Assert.assertEquals(String.valueOf(NullHandling.defaultDoubleValue()), dimensionSelector.getObject());
}
columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL);
dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec);
if (NullHandling.sqlCompatible()) {
Assert.assertNull(valueSelector.getObject());
Assert.assertTrue(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0)));
Assert.assertNull(dimensionSelector.getObject());
} else {
Assert.assertEquals(NullHandling.defaultDoubleValue(), valueSelector.getObject());
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals(1, dimensionSelector.getRow().size());
Assert.assertEquals(
String.valueOf(NullHandling.defaultDoubleValue()),
dimensionSelector.lookupName(dimensionSelector.getRow().get(0))
);
Assert.assertEquals(String.valueOf(NullHandling.defaultDoubleValue()), dimensionSelector.getObject());
}
}
@Test
public void testNestedColumnIndexerSchemaDiscoveryRootStringArray() throws IndexSizeExceededException
{
long minTimestamp = System.currentTimeMillis();
IncrementalIndex index = makeIncrementalIndex(minTimestamp);
index.add(makeInputRow(minTimestamp + 1, true, STRING_ARRAY_COL, new String[]{"a"}));
index.add(makeInputRow(minTimestamp + 2, true, STRING_ARRAY_COL, new Object[]{"b", "c"}));
index.add(makeInputRow(minTimestamp + 3, true, STRING_ARRAY_COL, ImmutableList.of("d", "e")));
index.add(makeInputRow(minTimestamp + 4, true, STRING_ARRAY_COL, null));
index.add(makeInputRow(minTimestamp + 5, false, STRING_ARRAY_COL, null));
IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index);
Sequence<Cursor> cursorSequence = storageAdapter.makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.NONE,
false,
null
);
final DimensionSpec dimensionSpec = new DefaultDimensionSpec(STRING_ARRAY_COL, STRING_ARRAY_COL, ColumnType.STRING);
List<Cursor> cursorList = cursorSequence.toList();
ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory();
ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
);
Assert.assertArrayEquals(new Object[]{"a"}, (Object[]) valueSelector.getObject());
columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
);
Assert.assertArrayEquals(new Object[]{"b", "c"}, (Object[]) valueSelector.getObject());
columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
);
Assert.assertArrayEquals(new Object[]{"d", "e"}, (Object[]) valueSelector.getObject());
columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
);
Assert.assertNull(valueSelector.getObject());
columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
);
Assert.assertNull(valueSelector.getObject());
}
@Test
public void testNestedColumnIndexerSchemaDiscoveryRootVariant() throws IndexSizeExceededException
{
long minTimestamp = System.currentTimeMillis();
IncrementalIndex index = makeIncrementalIndex(minTimestamp);
index.add(makeInputRow(minTimestamp + 1, true, VARIANT_COL, "a"));
index.add(makeInputRow(minTimestamp + 2, true, VARIANT_COL, 2L));
index.add(makeInputRow(minTimestamp + 3, true, VARIANT_COL, 3.3));
index.add(makeInputRow(minTimestamp + 4, true, VARIANT_COL, null));
index.add(makeInputRow(minTimestamp + 5, false, VARIANT_COL, null));
IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index);
Sequence<Cursor> cursorSequence = storageAdapter.makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.NONE,
false,
null
);
final DimensionSpec dimensionSpec = new DefaultDimensionSpec(VARIANT_COL, VARIANT_COL, ColumnType.STRING);
List<Cursor> cursorList = cursorSequence.toList();
ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory();
ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL);
DimensionSelector dimensionSelector = cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec);
Assert.assertEquals("a", valueSelector.getObject());
Assert.assertEquals("a", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL);
dimensionSelector = cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec);
Assert.assertEquals(2L, valueSelector.getObject());
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals("2", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL);
dimensionSelector = cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec);
Assert.assertEquals(3.3, valueSelector.getObject());
Assert.assertFalse(valueSelector.isNull());
Assert.assertEquals("3.3", dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL);
dimensionSelector = cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec);
Assert.assertNull(valueSelector.getObject());
Assert.assertNull(dimensionSelector.getObject());
columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL);
dimensionSelector = cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec);
Assert.assertNull(valueSelector.getObject());
Assert.assertNull(dimensionSelector.getObject());
}
@Test
public void testNestedColumnIndexerSchemaDiscoveryNested() throws IndexSizeExceededException
{
long minTimestamp = System.currentTimeMillis();
IncrementalIndex index = makeIncrementalIndex(minTimestamp);
index.add(makeInputRow(minTimestamp + 1, true, NESTED_COL, "a"));
index.add(makeInputRow(minTimestamp + 2, true, NESTED_COL, 2L));
index.add(makeInputRow(minTimestamp + 3, true, NESTED_COL, ImmutableMap.of("x", 1.1, "y", 2L)));
index.add(makeInputRow(minTimestamp + 4, true, NESTED_COL, null));
index.add(makeInputRow(minTimestamp + 5, false, NESTED_COL, null));
IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index);
Sequence<Cursor> cursorSequence = storageAdapter.makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.NONE,
false,
null
);
final DimensionSpec dimensionSpec = new DefaultDimensionSpec(NESTED_COL, NESTED_COL, ColumnType.STRING);
List<Cursor> cursorList = cursorSequence.toList();
ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory();
ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
);
Assert.assertEquals(StructuredData.wrap("a"), valueSelector.getObject());
columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
);
Assert.assertEquals(StructuredData.wrap(2L), valueSelector.getObject());
Assert.assertFalse(valueSelector.isNull());
columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
);
Assert.assertEquals(StructuredData.wrap(ImmutableMap.of("x", 1.1, "y", 2L)), valueSelector.getObject());
Assert.assertFalse(valueSelector.isNull());
columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
);
Assert.assertNull(valueSelector.getObject());
columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory();
valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec)
);
Assert.assertNull(valueSelector.getObject());
}
@Nonnull
private static IncrementalIndex makeIncrementalIndex(long minTimestamp)
{
IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema(
minTimestamp,
new TimestampSpec(TIME_COL, "millis", null),
Granularities.NONE,
VirtualColumns.EMPTY,
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
new AggregatorFactory[0],
false
)
)
.setMaxRowCount(1000)
.build();
return index;
}
private MapBasedInputRow makeInputRow(
long timestamp,
boolean explicitNull,
Object... kv
)
{
final Map<String, Object> event = TestHelper.makeMap(explicitNull, kv);
event.put("time", timestamp);
return new MapBasedInputRow(timestamp, ImmutableList.copyOf(event.keySet()), event);
}
}

View File

@ -41,7 +41,7 @@ public class SimpleDictionaryMergingIteratorTest
};
SimpleDictionaryMergingIterator<String> dictionaryMergeIterator = new SimpleDictionaryMergingIterator<>(
sortedLookups,
NestedDataColumnMerger.STRING_MERGING_COMPARATOR
AutoTypeColumnMerger.STRING_MERGING_COMPARATOR
);
List<String> expectedSequence = Lists.newArrayListWithExpectedSize(13);

View File

@ -22,49 +22,28 @@ package org.apache.druid.segment.nested;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.query.DefaultBitmapResultFactory;
import org.apache.druid.query.filter.SelectorPredicateFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.IndexableAdapter;
import org.apache.druid.segment.NestedDataColumnIndexer;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.index.BitmapColumnIndex;
import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.serde.ColumnPartSerde;
import org.apache.druid.segment.serde.ComplexColumnPartSerde;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CompressionUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@ -73,18 +52,10 @@ import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
public class NestedDataColumnSupplierV4Test extends InitializedNullHandlingTest
{
@ -106,178 +77,20 @@ public class NestedDataColumnSupplierV4Test extends InitializedNullHandlingTest
TestHelper.makeMap("x", 4L, "y", 2.0, "z", "e", "v", 11111L, "nullish", null)
);
List<Map<String, Object>> arrayTestData = ImmutableList.of(
TestHelper.makeMap("s", new Object[]{"a", "b", "c"}, "l", new Object[]{1L, 2L, 3L}, "d", new Object[]{1.1, 2.2}),
TestHelper.makeMap(
"s",
new Object[]{null, "b", "c"},
"l",
new Object[]{1L, null, 3L},
"d",
new Object[]{2.2, 2.2}
),
TestHelper.makeMap(
"s",
new Object[]{"b", "c"},
"l",
new Object[]{null, null},
"d",
new Object[]{1.1, null, 2.2}
),
TestHelper.makeMap("s", new Object[]{"a", "b", "c", "d"}, "l", new Object[]{4L, 2L, 3L}),
TestHelper.makeMap("s", new Object[]{"d", "b", "c", "a"}, "d", new Object[]{1.1, 2.2}),
TestHelper.makeMap("l", new Object[]{1L, 2L, 3L}, "d", new Object[]{3.1, 2.2, 1.9})
);
Closer closer = Closer.create();
SmooshedFileMapper fileMapper;
ByteBuffer baseBuffer;
SmooshedFileMapper arrayFileMapper;
ByteBuffer arrayBaseBuffer;
@BeforeClass
public static void staticSetup()
{
NestedDataModule.registerHandlersAndSerde();
}
@Before
public void setup() throws IOException
{
final String fileNameBase = "test";
final String arrayFileNameBase = "array";
fileMapper = smooshify(fileNameBase, tempFolder.newFolder(), data);
baseBuffer = fileMapper.mapFile(fileNameBase);
arrayFileMapper = smooshify(arrayFileNameBase, tempFolder.newFolder(), arrayTestData);
arrayBaseBuffer = arrayFileMapper.mapFile(arrayFileNameBase);
}
private SmooshedFileMapper smooshify(
String fileNameBase,
File tmpFile,
List<Map<String, Object>> data
)
throws IOException
{
SegmentWriteOutMediumFactory writeOutMediumFactory = TmpFileSegmentWriteOutMediumFactory.instance();
try (final FileSmoosher smoosher = new FileSmoosher(tmpFile)) {
NestedDataColumnSerializerV4 serializer = new NestedDataColumnSerializerV4(
fileNameBase,
IndexSpec.DEFAULT,
writeOutMediumFactory.makeSegmentWriteOutMedium(tempFolder.newFolder()),
closer
);
NestedDataColumnIndexer indexer = new NestedDataColumnIndexer();
for (Object o : data) {
indexer.processRowValsToUnsortedEncodedKeyComponent(o, false);
}
SortedMap<String, FieldTypeInfo.MutableTypeSet> sortedFields = new TreeMap<>();
IndexableAdapter.NestedColumnMergable mergable = closer.register(
new IndexableAdapter.NestedColumnMergable(
indexer.getSortedValueLookups(),
indexer.getFieldTypeInfo(),
true,
false,
null
)
);
SortedValueDictionary globalDictionarySortedCollector = mergable.getValueDictionary();
mergable.mergeFieldsInto(sortedFields);
serializer.open();
serializer.serializeFields(sortedFields);
serializer.serializeDictionaries(
globalDictionarySortedCollector.getSortedStrings(),
globalDictionarySortedCollector.getSortedLongs(),
globalDictionarySortedCollector.getSortedDoubles()
);
SettableSelector valueSelector = new SettableSelector();
for (Object o : data) {
valueSelector.setObject(StructuredData.wrap(o));
serializer.serialize(valueSelector);
}
try (SmooshedWriter writer = smoosher.addWithSmooshedWriter(fileNameBase, serializer.getSerializedSize())) {
serializer.writeTo(writer, smoosher);
}
smoosher.close();
return closer.register(SmooshedFileMapper.load(tmpFile));
}
}
@After
public void teardown() throws IOException
{
closer.close();
}
@Test
public void testBasicFunctionality() throws IOException
{
ColumnBuilder bob = new ColumnBuilder();
bob.setFileMapper(fileMapper);
ComplexColumnPartSerde partSerde = ComplexColumnPartSerde.createDeserializer(NestedDataComplexTypeSerde.TYPE_NAME);
ColumnPartSerde.Deserializer deserializer = partSerde.getDeserializer();
deserializer.read(baseBuffer, bob, NestedFieldColumnIndexSupplierTest.ALWAYS_USE_INDEXES);
final ColumnHolder holder = bob.build();
final ColumnCapabilities capabilities = holder.getCapabilities();
Assert.assertEquals(ColumnType.NESTED_DATA, capabilities.toColumnType());
Assert.assertTrue(holder.getColumnFormat() instanceof NestedDataComplexTypeSerde.NestedColumnFormatV4);
try (NestedDataComplexColumn column = (NestedDataComplexColumn) holder.getColumn()) {
smokeTest(column);
}
}
@Test
public void testConcurrency() throws ExecutionException, InterruptedException
{
// if this test ever starts being to be a flake, there might be thread safety issues
ColumnBuilder bob = new ColumnBuilder();
bob.setFileMapper(fileMapper);
NestedDataColumnSupplierV4 supplier = NestedDataColumnSupplierV4.read(
baseBuffer,
bob,
NestedFieldColumnIndexSupplierTest.ALWAYS_USE_INDEXES,
NestedDataComplexTypeSerde.OBJECT_MAPPER
);
final String expectedReason = "none";
final AtomicReference<String> failureReason = new AtomicReference<>(expectedReason);
final int threads = 10;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(threads, "NestedDataColumnSupplierTest-%d")
);
Collection<ListenableFuture<?>> futures = new ArrayList<>(threads);
final CountDownLatch threadsStartLatch = new CountDownLatch(1);
for (int i = 0; i < threads; ++i) {
futures.add(
executorService.submit(() -> {
try {
threadsStartLatch.await();
for (int iter = 0; iter < 5000; iter++) {
try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) {
smokeTest(column);
}
}
}
catch (Throwable ex) {
failureReason.set(ex.getMessage());
}
})
);
}
threadsStartLatch.countDown();
Futures.allAsList(futures).get();
Assert.assertEquals(expectedReason, failureReason.get());
}
@Test
public void testLegacyV3ReaderFormat() throws IOException
{
@ -538,85 +351,4 @@ public class NestedDataColumnSupplierV4Test extends InitializedNullHandlingTest
Assert.assertFalse(dimSelector.makeValueMatcher(x -> Objects.equals(x, NO_MATCH)).matches());
}
}
private static class SettableSelector extends ObjectColumnSelector<StructuredData>
{
private StructuredData data;
public void setObject(StructuredData o)
{
this.data = o;
}
@Nullable
@Override
public StructuredData getObject()
{
return data;
}
@Override
public Class classOfObject()
{
return StructuredData.class;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
private static class OnlyPositionalReadsTypeStrategy<T> implements TypeStrategy<T>
{
private final TypeStrategy<T> delegate;
private OnlyPositionalReadsTypeStrategy(TypeStrategy<T> delegate)
{
this.delegate = delegate;
}
@Override
public int estimateSizeBytes(T value)
{
return delegate.estimateSizeBytes(value);
}
@Override
public T read(ByteBuffer buffer)
{
throw new IllegalStateException("non-positional read");
}
@Override
public boolean readRetainsBufferReference()
{
return delegate.readRetainsBufferReference();
}
@Override
public int write(ByteBuffer buffer, T value, int maxSizeBytes)
{
return delegate.write(buffer, value, maxSizeBytes);
}
@Override
public T read(ByteBuffer buffer, int offset)
{
return delegate.read(buffer, offset);
}
@Override
public int write(ByteBuffer buffer, int offset, T value, int maxSizeBytes)
{
return delegate.write(buffer, offset, value, maxSizeBytes);
}
@Override
public int compare(Object o1, Object o2)
{
return delegate.compare(o1, o2);
}
}
}