improve nested column storage format for broader compatibility (#13568)

* bump nested column format version
changes:
* nested field files are now named by their position in field paths list, rather than directly by the path itself. this fixes issues with valid json properties with commas and newlines breaking the csv file meta.smoosh
* update StructuredDataProcessor to deal in NestedPathPart to be consistent with other abstract path handling rather than building JQ syntax strings directly
* add v3 format segment and test
This commit is contained in:
Clint Wylie 2022-12-15 15:39:26 -08:00 committed by GitHub
parent 7f3c117e3a
commit 9ae7a36ccd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 349 additions and 88 deletions

View File

@ -38,6 +38,7 @@ import org.apache.druid.segment.nested.StructuredDataProcessor;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -479,7 +480,7 @@ public class NestedDataExpressions
final StructuredDataProcessor processor = new StructuredDataProcessor() final StructuredDataProcessor processor = new StructuredDataProcessor()
{ {
@Override @Override
public int processLiteralField(String fieldName, Object fieldValue) public int processLiteralField(ArrayList<NestedPathPart> fieldPath, Object fieldValue)
{ {
// do nothing, we only want the list of fields returned by this processor // do nothing, we only want the list of fields returned by this processor
return 0; return 0;
@ -501,7 +502,7 @@ public class NestedDataExpressions
StructuredDataProcessor.ProcessResults info = processor.processFields(unwrap(input)); StructuredDataProcessor.ProcessResults info = processor.processFields(unwrap(input));
List<String> transformed = info.getLiteralFields() List<String> transformed = info.getLiteralFields()
.stream() .stream()
.map(p -> NestedPathFinder.toNormalizedJsonPath(NestedPathFinder.parseJqPath(p))) .map(NestedPathFinder::toNormalizedJsonPath)
.collect(Collectors.toList()); .collect(Collectors.toList());
return ExprEval.ofType( return ExprEval.ofType(
ExpressionType.STRING_ARRAY, ExpressionType.STRING_ARRAY,

View File

@ -35,10 +35,13 @@ import org.apache.druid.segment.nested.GlobalDictionarySortedCollector;
import org.apache.druid.segment.nested.GlobalDimensionDictionary; import org.apache.druid.segment.nested.GlobalDimensionDictionary;
import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
import org.apache.druid.segment.nested.NestedLiteralTypeInfo; import org.apache.druid.segment.nested.NestedLiteralTypeInfo;
import org.apache.druid.segment.nested.NestedPathFinder;
import org.apache.druid.segment.nested.NestedPathPart;
import org.apache.druid.segment.nested.StructuredData; import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.segment.nested.StructuredDataProcessor; import org.apache.druid.segment.nested.StructuredDataProcessor;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.SortedMap; import java.util.SortedMap;
@ -56,8 +59,9 @@ public class NestedDataColumnIndexer implements DimensionIndexer<StructuredData,
protected final StructuredDataProcessor indexerProcessor = new StructuredDataProcessor() protected final StructuredDataProcessor indexerProcessor = new StructuredDataProcessor()
{ {
@Override @Override
public int processLiteralField(String fieldName, Object fieldValue) public int processLiteralField(ArrayList<NestedPathPart> fieldPath, Object fieldValue)
{ {
final String fieldName = NestedPathFinder.toNormalizedJsonPath(fieldPath);
LiteralFieldIndexer fieldIndexer = fieldIndexers.get(fieldName); LiteralFieldIndexer fieldIndexer = fieldIndexers.get(fieldName);
if (fieldIndexer == null) { if (fieldIndexer == null) {
estimatedFieldKeySize += StructuredDataProcessor.estimateStringSize(fieldName); estimatedFieldKeySize += StructuredDataProcessor.estimateStringSize(fieldName);

View File

@ -77,7 +77,7 @@ import java.util.concurrent.ConcurrentHashMap;
* Implementation of {@link NestedDataComplexColumn} which uses a {@link CompressedVariableSizedBlobColumn} for the * Implementation of {@link NestedDataComplexColumn} which uses a {@link CompressedVariableSizedBlobColumn} for the
* 'raw' {@link StructuredData} values and provides selectors for nested 'literal' field columns. * 'raw' {@link StructuredData} values and provides selectors for nested 'literal' field columns.
*/ */
public final class CompressedNestedDataComplexColumn<TStringDictionary extends Indexed<ByteBuffer>> public abstract class CompressedNestedDataComplexColumn<TStringDictionary extends Indexed<ByteBuffer>>
extends NestedDataComplexColumn extends NestedDataComplexColumn
{ {
private final NestedDataColumnMetadata metadata; private final NestedDataColumnMetadata metadata;
@ -123,6 +123,12 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
this.compressedRawColumnSupplier = compressedRawColumnSupplier; this.compressedRawColumnSupplier = compressedRawColumnSupplier;
} }
public abstract List<NestedPathPart> parsePath(String path);
public abstract String getField(List<NestedPathPart> path);
public abstract String getFieldFileName(String fileNameBase, String field, int fieldIndex);
public GenericIndexed<String> getFields() public GenericIndexed<String> getFields()
{ {
return fields; return fields;
@ -133,7 +139,7 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
{ {
List<List<NestedPathPart>> fieldParts = new ArrayList<>(fields.size()); List<List<NestedPathPart>> fieldParts = new ArrayList<>(fields.size());
for (int i = 0; i < fields.size(); i++) { for (int i = 0; i < fields.size(); i++) {
fieldParts.add(NestedPathFinder.parseJqPath(fields.get(i))); fieldParts.add(parsePath(fields.get(i)));
} }
return fieldParts; return fieldParts;
} }
@ -405,11 +411,6 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
return getColumnHolder(field).getCapabilities().isNumeric(); return getColumnHolder(field).getCapabilities().isNumeric();
} }
private String getField(List<NestedPathPart> path)
{
return NestedPathFinder.toNormalizedJqPath(path);
}
private ColumnHolder getColumnHolder(String field) private ColumnHolder getColumnHolder(String field)
{ {
return columns.computeIfAbsent(field, this::readNestedFieldColumn); return columns.computeIfAbsent(field, this::readNestedFieldColumn);
@ -421,12 +422,17 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
if (fields.indexOf(field) < 0) { if (fields.indexOf(field) < 0) {
return null; return null;
} }
final NestedLiteralTypeInfo.TypeSet types = fieldInfo.getTypes(fields.indexOf(field)); final int fieldIndex = fields.indexOf(field);
final ByteBuffer dataBuffer = fileMapper.mapFile( final NestedLiteralTypeInfo.TypeSet types = fieldInfo.getTypes(fieldIndex);
NestedDataColumnSerializer.getFieldFileName(metadata.getFileNameBase(), field) final String fieldFileName = getFieldFileName(metadata.getFileNameBase(), field, fieldIndex);
); final ByteBuffer dataBuffer = fileMapper.mapFile(fieldFileName);
if (dataBuffer == null) { if (dataBuffer == null) {
throw new ISE("Can't find field [%s] in [%s] file.", field, metadata.getFileNameBase()); throw new ISE(
"Can't find field [%s] with name [%s] in [%s] file.",
field,
fieldFileName,
metadata.getFileNameBase()
);
} }
ColumnBuilder columnBuilder = new ColumnBuilder().setFileMapper(fileMapper); ColumnBuilder columnBuilder = new ColumnBuilder().setFileMapper(fileMapper);

View File

@ -239,7 +239,7 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
bitmapIndexWriter.writeTo(channel, smoosher); bitmapIndexWriter.writeTo(channel, smoosher);
} }
}; };
final String fieldFileName = NestedDataColumnSerializer.getFieldFileName(columnName, fieldName); final String fieldFileName = NestedDataColumnSerializer.getInternalFileName(columnName, fieldName);
final long size = fieldSerializer.getSerializedSize(); final long size = fieldSerializer.getSerializedSize();
log.debug("Column [%s] serializing [%s] field of size [%d].", columnName, fieldName, size); log.debug("Column [%s] serializing [%s] field of size [%d].", columnName, fieldName, size);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(fieldFileName, size)) { try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(fieldFileName, size)) {

View File

@ -56,6 +56,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
@ -69,6 +70,8 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
public static final String RAW_FILE_NAME = "__raw"; public static final String RAW_FILE_NAME = "__raw";
public static final String NULL_BITMAP_FILE_NAME = "__nullIndex"; public static final String NULL_BITMAP_FILE_NAME = "__nullIndex";
public static final String NESTED_FIELD_PREFIX = "__field_";
private final String name; private final String name;
private final SegmentWriteOutMedium segmentWriteOutMedium; private final SegmentWriteOutMedium segmentWriteOutMedium;
private final IndexSpec indexSpec; private final IndexSpec indexSpec;
@ -78,9 +81,11 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
private final StructuredDataProcessor fieldProcessor = new StructuredDataProcessor() private final StructuredDataProcessor fieldProcessor = new StructuredDataProcessor()
{ {
@Override @Override
public int processLiteralField(String fieldName, Object fieldValue) public int processLiteralField(ArrayList<NestedPathPart> fieldPath, Object fieldValue)
{ {
final GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.get(fieldName); final GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.get(
NestedPathFinder.toNormalizedJsonPath(fieldPath)
);
if (writer != null) { if (writer != null) {
try { try {
ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue); ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue);
@ -180,8 +185,10 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
{ {
this.fields = fields; this.fields = fields;
this.fieldWriters = Maps.newHashMapWithExpectedSize(fields.size()); this.fieldWriters = Maps.newHashMapWithExpectedSize(fields.size());
int ctr = 0;
for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) { for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) {
final String fieldName = field.getKey(); final String fieldName = field.getKey();
final String fieldFileName = NESTED_FIELD_PREFIX + ctr++;
fieldsWriter.write(fieldName); fieldsWriter.write(fieldName);
fieldsInfoWriter.write(field.getValue()); fieldsInfoWriter.write(field.getValue());
final GlobalDictionaryEncodedFieldColumnWriter<?> writer; final GlobalDictionaryEncodedFieldColumnWriter<?> writer;
@ -190,7 +197,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
if (Types.is(type, ValueType.STRING)) { if (Types.is(type, ValueType.STRING)) {
writer = new StringFieldColumnWriter( writer = new StringFieldColumnWriter(
name, name,
fieldName, fieldFileName,
segmentWriteOutMedium, segmentWriteOutMedium,
indexSpec, indexSpec,
globalDictionaryIdLookup globalDictionaryIdLookup
@ -198,7 +205,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
} else if (Types.is(type, ValueType.LONG)) { } else if (Types.is(type, ValueType.LONG)) {
writer = new LongFieldColumnWriter( writer = new LongFieldColumnWriter(
name, name,
fieldName, fieldFileName,
segmentWriteOutMedium, segmentWriteOutMedium,
indexSpec, indexSpec,
globalDictionaryIdLookup globalDictionaryIdLookup
@ -206,7 +213,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
} else { } else {
writer = new DoubleFieldColumnWriter( writer = new DoubleFieldColumnWriter(
name, name,
fieldName, fieldFileName,
segmentWriteOutMedium, segmentWriteOutMedium,
indexSpec, indexSpec,
globalDictionaryIdLookup globalDictionaryIdLookup
@ -215,7 +222,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
} else { } else {
writer = new VariantLiteralFieldColumnWriter( writer = new VariantLiteralFieldColumnWriter(
name, name,
fieldName, fieldFileName,
segmentWriteOutMedium, segmentWriteOutMedium,
indexSpec, indexSpec,
globalDictionaryIdLookup globalDictionaryIdLookup
@ -317,8 +324,8 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
{ {
Preconditions.checkState(closedForWrite, "Not closed yet!"); Preconditions.checkState(closedForWrite, "Not closed yet!");
Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not sorted?!?"); Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not sorted?!?");
// version 3 // version 4
channel.write(ByteBuffer.wrap(new byte[]{0x03})); channel.write(ByteBuffer.wrap(new byte[]{0x04}));
channel.write(ByteBuffer.wrap(metadataBytes)); channel.write(ByteBuffer.wrap(metadataBytes));
fieldsWriter.writeTo(channel, smoosher); fieldsWriter.writeTo(channel, smoosher);
fieldsInfoWriter.writeTo(channel, smoosher); fieldsInfoWriter.writeTo(channel, smoosher);
@ -341,6 +348,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
if (channel instanceof SmooshedWriter) { if (channel instanceof SmooshedWriter) {
channel.close(); channel.close();
} }
for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) { for (Map.Entry<String, NestedLiteralTypeInfo.MutableTypeSet> field : fields.entrySet()) {
// remove writer so that it can be collected when we are done with it // remove writer so that it can be collected when we are done with it
GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.remove(field.getKey()); GlobalDictionaryEncodedFieldColumnWriter<?> writer = fieldWriters.remove(field.getKey());
@ -357,11 +365,6 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
} }
} }
public static String getFieldFileName(String fileNameBase, String field)
{
return StringUtils.format("%s_%s", fileNameBase, field);
}
public static String getInternalFileName(String fileNameBase, String field) public static String getInternalFileName(String fileNameBase, String field)
{ {
return StringUtils.format("%s.%s", fileNameBase, field); return StringUtils.format("%s.%s", fileNameBase, field);

View File

@ -45,6 +45,7 @@ import java.nio.ByteBuffer;
public class NestedDataColumnSupplier implements Supplier<ComplexColumn> public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
{ {
private final byte version;
private final NestedDataColumnMetadata metadata; private final NestedDataColumnMetadata metadata;
private final CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier; private final CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier;
private final ImmutableBitmap nullValues; private final ImmutableBitmap nullValues;
@ -78,9 +79,9 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
TypeStrategy<Double> doubleTypeStrategy TypeStrategy<Double> doubleTypeStrategy
) )
{ {
byte version = bb.get(); this.version = bb.get();
if (version == 0x03) { if (version == 0x03 || version == 0x04) {
try { try {
final SmooshedFileMapper mapper = columnBuilder.getFileMapper(); final SmooshedFileMapper mapper = columnBuilder.getFileMapper();
metadata = jsonMapper.readValue( metadata = jsonMapper.readValue(
@ -157,10 +158,10 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
} }
} }
catch (IOException ex) { catch (IOException ex) {
throw new RE(ex, "Failed to deserialize V3 column."); throw new RE(ex, "Failed to deserialize V%s column.", version);
} }
} else { } else {
throw new RE("Unknown version" + version); throw new RE("Unknown version " + version);
} }
fileMapper = Preconditions.checkNotNull(columnBuilder.getFileMapper(), "Null fileMapper"); fileMapper = Preconditions.checkNotNull(columnBuilder.getFileMapper(), "Null fileMapper");
@ -170,9 +171,17 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
@Override @Override
public ComplexColumn get() public ComplexColumn get()
{
if (version == 0x03) {
return makeV3();
}
return makeV4();
}
private NestedDataColumnV3 makeV3()
{ {
if (frontCodedDictionarySupplier != null) { if (frontCodedDictionarySupplier != null) {
return new CompressedNestedDataComplexColumn<>( return new NestedDataColumnV3<>(
metadata, metadata,
columnConfig, columnConfig,
compressedRawColumnSupplier, compressedRawColumnSupplier,
@ -185,7 +194,37 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
fileMapper fileMapper
); );
} }
return new CompressedNestedDataComplexColumn<>( return new NestedDataColumnV3<>(
metadata,
columnConfig,
compressedRawColumnSupplier,
nullValues,
fields,
fieldInfo,
dictionary::singleThreaded,
longDictionarySupplier,
doubleDictionarySupplier,
fileMapper
);
}
private NestedDataColumnV4 makeV4()
{
if (frontCodedDictionarySupplier != null) {
return new NestedDataColumnV4<>(
metadata,
columnConfig,
compressedRawColumnSupplier,
nullValues,
fields,
fieldInfo,
frontCodedDictionarySupplier,
longDictionarySupplier,
doubleDictionarySupplier,
fileMapper
);
}
return new NestedDataColumnV4<>(
metadata, metadata,
columnConfig, columnConfig,
compressedRawColumnSupplier, compressedRawColumnSupplier,
@ -202,9 +241,7 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
private ByteBuffer loadInternalFile(SmooshedFileMapper fileMapper, String internalFileName) throws IOException private ByteBuffer loadInternalFile(SmooshedFileMapper fileMapper, String internalFileName) throws IOException
{ {
return fileMapper.mapFile( return fileMapper.mapFile(
NestedDataColumnSerializer.getInternalFileName( NestedDataColumnSerializer.getInternalFileName(metadata.getFileNameBase(), internalFileName)
metadata.getFileNameBase(), internalFileName
)
); );
} }
} }

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.nested;
import com.google.common.base.Supplier;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSupplier;
import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.Indexed;
import java.nio.ByteBuffer;
import java.util.List;
public final class NestedDataColumnV3<TStringDictionary extends Indexed<ByteBuffer>>
extends CompressedNestedDataComplexColumn<TStringDictionary>
{
public NestedDataColumnV3(
NestedDataColumnMetadata metadata,
ColumnConfig columnConfig,
CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier,
ImmutableBitmap nullValues,
GenericIndexed<String> fields,
NestedLiteralTypeInfo fieldInfo,
Supplier<TStringDictionary> stringDictionary,
Supplier<FixedIndexed<Long>> longDictionarySupplier,
Supplier<FixedIndexed<Double>> doubleDictionarySupplier,
SmooshedFileMapper fileMapper
)
{
super(
metadata,
columnConfig,
compressedRawColumnSupplier,
nullValues,
fields,
fieldInfo,
stringDictionary,
longDictionarySupplier,
doubleDictionarySupplier,
fileMapper
);
}
@Override
public List<NestedPathPart> parsePath(String path)
{
return NestedPathFinder.parseJqPath(path);
}
@Override
public String getFieldFileName(String fileNameBase, String field, int fieldIndex)
{
return StringUtils.format("%s_%s", fileNameBase, field);
}
@Override
public String getField(List<NestedPathPart> path)
{
return NestedPathFinder.toNormalizedJqPath(path);
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.nested;
import com.google.common.base.Supplier;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSupplier;
import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.Indexed;
import java.nio.ByteBuffer;
import java.util.List;
public final class NestedDataColumnV4<TStringDictionary extends Indexed<ByteBuffer>>
extends CompressedNestedDataComplexColumn<TStringDictionary>
{
public NestedDataColumnV4(
NestedDataColumnMetadata metadata,
ColumnConfig columnConfig,
CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier,
ImmutableBitmap nullValues,
GenericIndexed<String> fields,
NestedLiteralTypeInfo fieldInfo,
Supplier<TStringDictionary> stringDictionary,
Supplier<FixedIndexed<Long>> longDictionarySupplier,
Supplier<FixedIndexed<Double>> doubleDictionarySupplier,
SmooshedFileMapper fileMapper
)
{
super(
metadata,
columnConfig,
compressedRawColumnSupplier,
nullValues,
fields,
fieldInfo,
stringDictionary,
longDictionarySupplier,
doubleDictionarySupplier,
fileMapper
);
}
@Override
public List<NestedPathPart> parsePath(String path)
{
return NestedPathFinder.parseJsonPath(path);
}
@Override
public String getFieldFileName(String fileNameBase, String field, int fieldIndex)
{
return NestedDataColumnSerializer.getInternalFileName(
fileNameBase,
NestedDataColumnSerializer.NESTED_FIELD_PREFIX + fieldIndex
);
}
@Override
public String getField(List<NestedPathPart> path)
{
return NestedPathFinder.toNormalizedJsonPath(path);
}
}

View File

@ -162,7 +162,7 @@ public class NestedPathFinder
public static String toNormalizedJqPath(List<NestedPathPart> paths) public static String toNormalizedJqPath(List<NestedPathPart> paths)
{ {
if (paths.isEmpty()) { if (paths.isEmpty()) {
return StructuredDataProcessor.ROOT_LITERAL; return ".";
} }
StringBuilder bob = new StringBuilder(); StringBuilder bob = new StringBuilder();
boolean first = true; boolean first = true;

View File

@ -19,10 +19,9 @@
package org.apache.druid.segment.nested; package org.apache.druid.segment.nested;
import com.google.common.collect.ImmutableSet;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -31,29 +30,23 @@ import java.util.Set;
public abstract class StructuredDataProcessor public abstract class StructuredDataProcessor
{ {
public static final String ROOT_LITERAL = "."; public abstract int processLiteralField(ArrayList<NestedPathPart> fieldPath, Object fieldValue);
private static final Set<String> ROOT_LITERAL_FIELDS = ImmutableSet.of(ROOT_LITERAL);
public abstract int processLiteralField(String fieldName, Object fieldValue);
/** /**
* Process fields, returning a list of all "normalized" 'jq' paths to literal fields, consistent with the output of * Process fields, returning a list of all paths to literal fields, represented as an ordered sequence of
* {@link NestedPathFinder#toNormalizedJqPath(List)}. * {@link NestedPathPart}.
*
* Note: in the future, {@link ProcessResults#getLiteralFields()} should instead probably be modified to deal in
* lists of {@link NestedPathPart} instead so that callers can decide how to represent the path instead of assuing
* 'jq' syntax.
*/ */
public ProcessResults processFields(Object raw) public ProcessResults processFields(Object raw)
{ {
Queue<Field> toProcess = new ArrayDeque<>(); Queue<Field> toProcess = new ArrayDeque<>();
raw = StructuredData.unwrap(raw); raw = StructuredData.unwrap(raw);
ArrayList<NestedPathPart> newPath = new ArrayList<>();
if (raw instanceof Map) { if (raw instanceof Map) {
toProcess.add(new MapField("", (Map<String, ?>) raw)); toProcess.add(new MapField(newPath, (Map<String, ?>) raw));
} else if (raw instanceof List) { } else if (raw instanceof List) {
toProcess.add(new ListField(ROOT_LITERAL, (List<?>) raw)); toProcess.add(new ListField(newPath, (List<?>) raw));
} else { } else {
return new ProcessResults().withFields(ROOT_LITERAL_FIELDS).withSize(processLiteralField(ROOT_LITERAL, raw)); return new ProcessResults().addLiteralField(newPath, processLiteralField(newPath, raw));
} }
ProcessResults accumulator = new ProcessResults(); ProcessResults accumulator = new ProcessResults();
@ -76,17 +69,18 @@ public abstract class StructuredDataProcessor
for (Map.Entry<String, ?> entry : map.getMap().entrySet()) { for (Map.Entry<String, ?> entry : map.getMap().entrySet()) {
// add estimated size of string key // add estimated size of string key
processResults.addSize(estimateStringSize(entry.getKey())); processResults.addSize(estimateStringSize(entry.getKey()));
final String fieldName = map.getName() + ".\"" + entry.getKey() + "\"";
Object value = StructuredData.unwrap(entry.getValue()); Object value = StructuredData.unwrap(entry.getValue());
// lists and maps go back in the queue // lists and maps go back in the queue
final ArrayList<NestedPathPart> newPath = new ArrayList<>(map.getPath());
newPath.add(new NestedPathField(entry.getKey()));
if (value instanceof List) { if (value instanceof List) {
List<?> theList = (List<?>) value; List<?> theList = (List<?>) value;
toProcess.add(new ListField(fieldName, theList)); toProcess.add(new ListField(newPath, theList));
} else if (value instanceof Map) { } else if (value instanceof Map) {
toProcess.add(new MapField(fieldName, (Map<String, ?>) value)); toProcess.add(new MapField(newPath, (Map<String, ?>) value));
} else { } else {
// literals get processed // literals get processed
processResults.addLiteralField(fieldName, processLiteralField(fieldName, value)); processResults.addLiteralField(newPath, processLiteralField(newPath, value));
} }
} }
return processResults; return processResults;
@ -98,16 +92,17 @@ public abstract class StructuredDataProcessor
ProcessResults results = new ProcessResults().withSize(8); ProcessResults results = new ProcessResults().withSize(8);
final List<?> theList = list.getList(); final List<?> theList = list.getList();
for (int i = 0; i < theList.size(); i++) { for (int i = 0; i < theList.size(); i++) {
final String listFieldName = list.getName() + "[" + i + "]"; final ArrayList<NestedPathPart> newPath = new ArrayList<>(list.getPath());
newPath.add(new NestedPathArrayElement(i));
final Object element = StructuredData.unwrap(theList.get(i)); final Object element = StructuredData.unwrap(theList.get(i));
// maps and lists go back into the queue // maps and lists go back into the queue
if (element instanceof Map) { if (element instanceof Map) {
toProcess.add(new MapField(listFieldName, (Map<String, ?>) element)); toProcess.add(new MapField(newPath, (Map<String, ?>) element));
} else if (element instanceof List) { } else if (element instanceof List) {
toProcess.add(new ListField(listFieldName, (List<?>) element)); toProcess.add(new ListField(newPath, (List<?>) element));
} else { } else {
// literals get processed // literals get processed
results.addLiteralField(listFieldName, processLiteralField(listFieldName, element)); results.addLiteralField(newPath, processLiteralField(newPath, element));
} }
} }
return results; return results;
@ -115,16 +110,16 @@ public abstract class StructuredDataProcessor
abstract static class Field abstract static class Field
{ {
private final String name; private final ArrayList<NestedPathPart> path;
protected Field(String name) protected Field(ArrayList<NestedPathPart> path)
{ {
this.name = name; this.path = path;
} }
public String getName() public ArrayList<NestedPathPart> getPath()
{ {
return name; return path;
} }
} }
@ -132,9 +127,9 @@ public abstract class StructuredDataProcessor
{ {
private final List<?> list; private final List<?> list;
ListField(String name, List<?> list) ListField(ArrayList<NestedPathPart> path, List<?> list)
{ {
super(name); super(path);
this.list = list; this.list = list;
} }
@ -148,9 +143,9 @@ public abstract class StructuredDataProcessor
{ {
private final Map<String, ?> map; private final Map<String, ?> map;
MapField(String name, Map<String, ?> map) MapField(ArrayList<NestedPathPart> path, Map<String, ?> map)
{ {
super(name); super(path);
this.map = map; this.map = map;
} }
@ -165,7 +160,7 @@ public abstract class StructuredDataProcessor
*/ */
public static class ProcessResults public static class ProcessResults
{ {
private Set<String> literalFields; private Set<ArrayList<NestedPathPart>> literalFields;
private int estimatedSize; private int estimatedSize;
public ProcessResults() public ProcessResults()
@ -174,7 +169,7 @@ public abstract class StructuredDataProcessor
estimatedSize = 0; estimatedSize = 0;
} }
public Set<String> getLiteralFields() public Set<ArrayList<NestedPathPart>> getLiteralFields()
{ {
return literalFields; return literalFields;
} }
@ -190,19 +185,13 @@ public abstract class StructuredDataProcessor
return this; return this;
} }
public ProcessResults addLiteralField(String fieldName, int sizeOfValue) public ProcessResults addLiteralField(ArrayList<NestedPathPart> fieldPath, int sizeOfValue)
{ {
literalFields.add(fieldName); literalFields.add(fieldPath);
this.estimatedSize += sizeOfValue; this.estimatedSize += sizeOfValue;
return this; return this;
} }
public ProcessResults withFields(Set<String> fields)
{
this.literalFields = fields;
return this;
}
public ProcessResults withSize(int size) public ProcessResults withSize(int size)
{ {
this.estimatedSize = size; this.estimatedSize = size;

View File

@ -129,12 +129,12 @@ public class NestedDataExpressionsTest extends InitializedNullHandlingTest
Expr expr = Parser.parse("json_paths(nest)", MACRO_TABLE); Expr expr = Parser.parse("json_paths(nest)", MACRO_TABLE);
ExprEval eval = expr.eval(inputBindings); ExprEval eval = expr.eval(inputBindings);
Assert.assertEquals(ExpressionType.STRING_ARRAY, eval.type()); Assert.assertEquals(ExpressionType.STRING_ARRAY, eval.type());
Assert.assertArrayEquals(new Object[]{"$.y", "$.z", "$.x"}, (Object[]) eval.value()); Assert.assertArrayEquals(new Object[]{"$.x", "$.y", "$.z"}, (Object[]) eval.value());
expr = Parser.parse("json_paths(nester)", MACRO_TABLE); expr = Parser.parse("json_paths(nester)", MACRO_TABLE);
eval = expr.eval(inputBindings); eval = expr.eval(inputBindings);
Assert.assertEquals(ExpressionType.STRING_ARRAY, eval.type()); Assert.assertEquals(ExpressionType.STRING_ARRAY, eval.type());
Assert.assertArrayEquals(new Object[]{"$.x[0]", "$.x[1]", "$.x[2]", "$.y.b", "$.y.a"}, (Object[]) eval.value()); Assert.assertArrayEquals(new Object[]{"$.x[0]", "$.y.a", "$.x[1]", "$.y.b", "$.x[2]"}, (Object[]) eval.value());
} }
@Test @Test

View File

@ -38,7 +38,7 @@ public class NestedDataColumnIndexerTest extends InitializedNullHandlingTest
EncodedKeyComponent<StructuredData> key; EncodedKeyComponent<StructuredData> key;
// new raw value, new field, new dictionary entry // new raw value, new field, new dictionary entry
key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", "foo"), false); key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", "foo"), false);
Assert.assertEquals(230, key.getEffectiveSizeBytes()); Assert.assertEquals(228, key.getEffectiveSizeBytes());
Assert.assertEquals(1, indexer.getCardinality()); Assert.assertEquals(1, indexer.getCardinality());
// adding same value only adds estimated size of value itself // adding same value only adds estimated size of value itself
key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", "foo"), false); key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", "foo"), false);
@ -67,7 +67,7 @@ public class NestedDataColumnIndexerTest extends InitializedNullHandlingTest
Assert.assertEquals(5, indexer.getCardinality()); Assert.assertEquals(5, indexer.getCardinality());
// new raw value, new fields // new raw value, new fields
key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), false); key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), false);
Assert.assertEquals(292, key.getEffectiveSizeBytes()); Assert.assertEquals(286, key.getEffectiveSizeBytes());
Assert.assertEquals(5, indexer.getCardinality()); Assert.assertEquals(5, indexer.getCardinality());
// new raw value // new raw value
key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), false); key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), false);

View File

@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.bitmap.RoaringBitmapFactory; import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
@ -41,9 +42,12 @@ import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.NestedDataColumnIndexer; import org.apache.druid.segment.NestedDataColumnIndexer;
import org.apache.druid.segment.ObjectColumnSelector; import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.BitmapColumnIndex;
import org.apache.druid.segment.column.ColumnBuilder; import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnIndexSupplier; import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.DruidPredicateIndex; import org.apache.druid.segment.column.DruidPredicateIndex;
@ -52,9 +56,11 @@ import org.apache.druid.segment.column.StringValueSetIndex;
import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CompressionUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
@ -102,6 +108,12 @@ public class NestedDataColumnSupplierTest extends InitializedNullHandlingTest
ByteBuffer baseBuffer; ByteBuffer baseBuffer;
@BeforeClass
public static void staticSetup()
{
NestedDataModule.registerHandlersAndSerde();
}
@Before @Before
public void setup() throws IOException public void setup() throws IOException
{ {
@ -214,6 +226,49 @@ public class NestedDataColumnSupplierTest extends InitializedNullHandlingTest
Assert.assertEquals(expectedReason, failureReason.get()); Assert.assertEquals(expectedReason, failureReason.get());
} }
@Test
public void testLegacyV3ReaderFormat() throws IOException
{
String columnName = "shipTo";
String firstValue = "Cole";
File tmpLocation = tempFolder.newFolder();
File v3Segment = new File(
NestedDataColumnSupplierTest.class.getClassLoader().getResource("nested_segment_v3/index.zip").getFile()
);
CompressionUtils.unzip(v3Segment, tmpLocation);
try (Closer closer = Closer.create()) {
QueryableIndex theIndex = closer.register(TestHelper.getTestIndexIO().loadIndex(tmpLocation));
ColumnHolder holder = theIndex.getColumnHolder(columnName);
Assert.assertNotNull(holder);
Assert.assertEquals(NestedDataComplexTypeSerde.TYPE, holder.getCapabilities().toColumnType());
NestedDataColumnV3<?> v3 = closer.register((NestedDataColumnV3<?>) holder.getColumn());
Assert.assertNotNull(v3);
List<NestedPathPart> path = ImmutableList.of(new NestedPathField("lastName"));
ColumnHolder nestedColumnHolder = v3.getColumnHolder(path);
Assert.assertNotNull(nestedColumnHolder);
Assert.assertEquals(ColumnType.STRING, nestedColumnHolder.getCapabilities().toColumnType());
NestedFieldLiteralDictionaryEncodedColumn<?> nestedColumn =
(NestedFieldLiteralDictionaryEncodedColumn<?>) nestedColumnHolder.getColumn();
Assert.assertNotNull(nestedColumn);
ColumnValueSelector<?> selector = nestedColumn.makeColumnValueSelector(
new SimpleAscendingOffset(theIndex.getNumRows())
);
ColumnIndexSupplier indexSupplier = v3.getColumnIndexSupplier(path);
Assert.assertNotNull(indexSupplier);
StringValueSetIndex valueSetIndex = indexSupplier.as(StringValueSetIndex.class);
Assert.assertNotNull(valueSetIndex);
BitmapColumnIndex indexForValue = valueSetIndex.forValue(firstValue);
Assert.assertEquals(firstValue, selector.getObject());
Assert.assertTrue(indexForValue.computeBitmapResult(resultFactory).get(0));
}
}
private void smokeTest(NestedDataComplexColumn column) throws IOException private void smokeTest(NestedDataComplexColumn column) throws IOException
{ {
SimpleAscendingOffset offset = new SimpleAscendingOffset(data.size()); SimpleAscendingOffset offset = new SimpleAscendingOffset(data.size());

View File

@ -2233,7 +2233,7 @@ public class CalciteNestedDataQueryTest extends BaseCalciteQueryTest
), ),
ImmutableList.of( ImmutableList.of(
new Object[]{"[\"$\"]", 5L}, new Object[]{"[\"$\"]", 5L},
new Object[]{"[\"$.n.x\",\"$.array[0]\",\"$.array[1]\"]", 2L} new Object[]{"[\"$.array[1]\",\"$.array[0]\",\"$.n.x\"]", 2L}
), ),
RowSignature.builder() RowSignature.builder()
.add("EXPR$0", ColumnType.STRING_ARRAY) .add("EXPR$0", ColumnType.STRING_ARRAY)