generic block compressed complex columns (#16863)

changes:
* Adds new `CompressedComplexColumn`, `CompressedComplexColumnSerializer`, `CompressedComplexColumnSupplier` based on `CompressedVariableSizedBlobColumn` used by JSON columns
* Adds `IndexSpec.complexMetricCompression` which can be used to specify compression for the generic compressed complex column. Defaults to uncompressed because compressed columns are not backwards compatible.
* Adds new definition of `ComplexMetricSerde.getSerializer` which accepts an `IndexSpec` argument when creating a serializer. The old signature has been marked `@Deprecated` and has a default implementation that returns `null`, but it will be used by the default implementation of the new version if it is implemented to return a non-null value. The default implementation of the new method will use a `CompressedComplexColumnSerializer` if `IndexSpec.complexMetricCompression` is not null/none/uncompressed, or will use `LargeColumnSupportedComplexColumnSerializer` otherwise.
* Removed all duplicate generic implementations of `ComplexMetricSerde.getSerializer` and `ComplexMetricSerde.deserializeColumn` into default implementations `ComplexMetricSerde` instead of being copied all over the place. The default implementation of `deserializeColumn` will check if the first byte indicates that the new compression was used, otherwise will use the `GenericIndexed` based supplier.
* Complex columns with custom serializers/deserializers are unaffected and may continue doing whatever it is they do, either with specialized compression or whatever else, this new stuff is just to provide generic implementations built around `ObjectStrategy`.
* add ObjectStrategy.readRetainsBufferReference so CompressedComplexColumn only copies on read if required
* add copyValueOnRead flag down to CompressedBlockReader to avoid buffer duplicate if the value needs copied anyway
This commit is contained in:
Clint Wylie 2024-08-27 00:34:41 -07:00 committed by GitHub
parent ed3dbd6242
commit f8301a314f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
100 changed files with 1155 additions and 614 deletions

View File

@ -520,6 +520,7 @@ For information on defining an `indexSpec` in a query context, see [SQL-based in
|stringDictionaryEncoding|Encoding format for string value dictionaries used by STRING and [COMPLEX<json>](../querying/nested-columns.md) columns. To enable front coding, set `stringDictionaryEncoding.type` to `frontCoded`. Optionally, you can specify the `bucketSize` and `formatVersion` properties. See [Front coding](#front-coding) for more information.|`{"type":"utf8"}`|
|metricCompression|Compression format for primitive type metric columns. Options are `lz4`, `lzf`, `zstd`, `uncompressed`, or `none` (which is more efficient than `uncompressed`, but not supported by older versions of Druid).|`lz4`|
|longEncoding|Encoding format for long-typed columns. Applies regardless of whether they are dimensions or metrics. Options are `auto` or `longs`. `auto` encodes the values using offset or lookup table depending on column cardinality, and store them with variable size. `longs` stores the value as-is with 8 bytes each.|`longs`|
|complexMetricCompression|Compression format for complex type metric columns. Options are `lz4`, `lzf`, `zstd`, `uncompressed`. Options other than `uncompressed` are not compatible with Druid versions older than 31, and only applies to complex metrics which do not have specialized column formats.|`uncompressed`|
|jsonCompression|Compression format to use for nested column raw data. Options are `lz4`, `lzf`, `zstd`, or `uncompressed`.|`lz4`|
#### Front coding

View File

@ -69,6 +69,7 @@ public class CompressedBigDecimalColumn implements ComplexColumn
}
@Override
@Nullable
public CompressedBigDecimal getRowValue(int rowNum)
{
int s = scale.get(rowNum);
@ -96,7 +97,8 @@ public class CompressedBigDecimalColumn implements ComplexColumn
{
return new ObjectColumnSelector<CompressedBigDecimal>()
{
@Override @Nullable
@Override
@Nullable
public CompressedBigDecimal getObject()
{
return getRowValue(offset.getOffset());

View File

@ -20,6 +20,7 @@
package org.apache.druid.compressedbigdecimal;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
@ -73,7 +74,8 @@ public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
builder.setComplexColumnSupplier(
CompressedBigDecimalColumnPartSupplier.fromByteBuffer(buffer));
CompressedBigDecimalColumnPartSupplier.fromByteBuffer(buffer)
);
}
/* (non-Javadoc)
@ -83,7 +85,8 @@ public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
@Override
public CompressedBigDecimalLongColumnSerializer getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column
String column,
IndexSpec indexSpec
)
{
return CompressedBigDecimalLongColumnSerializer.create(segmentWriteOutMedium, column);

View File

@ -91,4 +91,10 @@ public class CompressedBigDecimalObjectStrategy implements ObjectStrategy<Compre
return buf.array();
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
}

View File

@ -22,17 +22,9 @@ package org.apache.druid.query.aggregation.ddsketch;
import com.datadoghq.sketch.ddsketch.DDSketch;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
public class DDSketchComplexMetricSerde extends ComplexMetricSerde
@ -84,31 +76,9 @@ public class DDSketchComplexMetricSerde extends ComplexMetricSerde
};
}
@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
final GenericIndexed<DDSketch> column = GenericIndexed.read(
buffer,
STRATEGY,
builder.getFileMapper()
);
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
@Override
public ObjectStrategy<DDSketch> getObjectStrategy()
{
return STRATEGY;
}
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(
segmentWriteOutMedium,
column,
this.getObjectStrategy()
);
}
}

View File

@ -70,4 +70,10 @@ public class DDSketchObjectStrategy implements ObjectStrategy<DDSketch>
{
return DDSketchAggregatorFactory.COMPARATOR.compare(o1, o2);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.ddsketch;
import org.junit.Assert;
import org.junit.Test;
public class DDSketchObjectStrategyTest
{
@Test
public void testReadRetainsBufferReference()
{
DDSketchObjectStrategy strategy = new DDSketchObjectStrategy();
Assert.assertFalse(strategy.readRetainsBufferReference());
}
}

View File

@ -21,17 +21,9 @@ package org.apache.druid.query.aggregation.momentsketch;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.query.aggregation.momentsketch.aggregator.MomentSketchAggregatorFactory;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
public class MomentSketchComplexMetricSerde extends ComplexMetricSerde
{
@ -62,31 +54,9 @@ public class MomentSketchComplexMetricSerde extends ComplexMetricSerde
};
}
@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
final GenericIndexed<MomentSketchWrapper> column = GenericIndexed.read(
buffer,
STRATEGY,
builder.getFileMapper()
);
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
@Override
public ObjectStrategy<MomentSketchWrapper> getObjectStrategy()
{
return STRATEGY;
}
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(
segmentWriteOutMedium,
column,
this.getObjectStrategy()
);
}
}

View File

@ -59,4 +59,10 @@ public class MomentSketchObjectStrategy implements ObjectStrategy<MomentSketchWr
{
return MomentSketchAggregatorFactory.COMPARATOR.compare(o1, o2);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.momentsketch;
import org.junit.Assert;
import org.junit.Test;
public class MomentSketchObjectStrategyTest
{
@Test
public void testReadRetainsBufferReference()
{
MomentSketchObjectStrategy strategy = new MomentSketchObjectStrategy();
Assert.assertFalse(strategy.readRetainsBufferReference());
}
}

View File

@ -54,8 +54,10 @@ public class RabbitStreamIndexTaskTuningConfigTest
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class)),
TuningConfig.class);
TuningConfig.class
)),
TuningConfig.class
);
Assert.assertNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
@ -79,26 +81,28 @@ public class RabbitStreamIndexTaskTuningConfigTest
public void testSerdeWithNonDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"rabbit\",\n"
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxRowsInMemory\": 100,\n"
+ " \"maxRowsPerSegment\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"recordBufferSize\": 1000,\n"
+ " \"recordBufferOfferTimeout\": 500,\n"
+ " \"resetOffsetAutomatically\": false,\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
+ " \"type\": \"rabbit\",\n"
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxRowsInMemory\": 100,\n"
+ " \"maxRowsPerSegment\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"recordBufferSize\": 1000,\n"
+ " \"recordBufferOfferTimeout\": 500,\n"
+ " \"resetOffsetAutomatically\": false,\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
RabbitStreamIndexTaskTuningConfig config = (RabbitStreamIndexTaskTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class)),
TuningConfig.class);
TuningConfig.class
)),
TuningConfig.class
);
Assert.assertNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
@ -119,62 +123,65 @@ public class RabbitStreamIndexTaskTuningConfigTest
public void testtoString() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"rabbit\",\n"
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxRowsInMemory\": 100,\n"
+ " \"maxRowsPerSegment\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"recordBufferSize\": 1000,\n"
+ " \"recordBufferOfferTimeout\": 500,\n"
+ " \"resetOffsetAutomatically\": false,\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
+ " \"type\": \"rabbit\",\n"
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxRowsInMemory\": 100,\n"
+ " \"maxRowsPerSegment\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"recordBufferSize\": 1000,\n"
+ " \"recordBufferOfferTimeout\": 500,\n"
+ " \"resetOffsetAutomatically\": false,\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
RabbitStreamIndexTaskTuningConfig config = (RabbitStreamIndexTaskTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class)),
TuningConfig.class);
TuningConfig.class
)),
TuningConfig.class
);
String resStr = "RabbitStreamSupervisorTuningConfig{" +
"maxRowsInMemory=100, " +
"maxRowsPerSegment=100, " +
"maxTotalRows=null, " +
"maxBytesInMemory=" + config.getMaxBytesInMemoryOrDefault() + ", " +
"skipBytesInMemoryOverheadCheck=false, " +
"intermediatePersistPeriod=PT1H, " +
"maxPendingPersists=100, " +
"indexSpec=IndexSpec{" +
"bitmapSerdeFactory=RoaringBitmapSerdeFactory{}, " +
"dimensionCompression=lz4, " +
"stringDictionaryEncoding=Utf8{}, " +
"metricCompression=lz4, " +
"longEncoding=longs, " +
"jsonCompression=null, " +
"segmentLoader=null" +
"}, " +
"reportParseExceptions=true, " +
"handoffConditionTimeout=100, " +
"resetOffsetAutomatically=false, " +
"segmentWriteOutMediumFactory=null, " +
"workerThreads=null, " +
"chatRetries=8, " +
"httpTimeout=PT10S, " +
"shutdownTimeout=PT80S, " +
"recordBufferSize=1000, " +
"recordBufferOfferTimeout=500, " +
"offsetFetchPeriod=PT30S, " +
"intermediateHandoffPeriod=" + config.getIntermediateHandoffPeriod() + ", " +
"logParseExceptions=false, " +
"maxParseExceptions=0, " +
"maxSavedParseExceptions=0, " +
"numPersistThreads=1, " +
"maxRecordsPerPoll=null}";
"maxRowsInMemory=100, " +
"maxRowsPerSegment=100, " +
"maxTotalRows=null, " +
"maxBytesInMemory=" + config.getMaxBytesInMemoryOrDefault() + ", " +
"skipBytesInMemoryOverheadCheck=false, " +
"intermediatePersistPeriod=PT1H, " +
"maxPendingPersists=100, " +
"indexSpec=IndexSpec{" +
"bitmapSerdeFactory=RoaringBitmapSerdeFactory{}, " +
"dimensionCompression=lz4, " +
"stringDictionaryEncoding=Utf8{}, " +
"metricCompression=lz4, " +
"longEncoding=longs, " +
"complexMetricCompression=null, " +
"jsonCompression=null, " +
"segmentLoader=null" +
"}, " +
"reportParseExceptions=true, " +
"handoffConditionTimeout=100, " +
"resetOffsetAutomatically=false, " +
"segmentWriteOutMediumFactory=null, " +
"workerThreads=null, " +
"chatRetries=8, " +
"httpTimeout=PT10S, " +
"shutdownTimeout=PT80S, " +
"recordBufferSize=1000, " +
"recordBufferOfferTimeout=500, " +
"offsetFetchPeriod=PT30S, " +
"intermediateHandoffPeriod=" + config.getIntermediateHandoffPeriod() + ", " +
"logParseExceptions=false, " +
"maxParseExceptions=0, " +
"maxSavedParseExceptions=0, " +
"numPersistThreads=1, " +
"maxRecordsPerPoll=null}";
Assert.assertEquals(resStr, config.toString());
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.spectator.histogram;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
@ -97,7 +98,11 @@ public class SpectatorHistogramComplexMetricSerde extends ComplexMetricSerde
}
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
public GenericColumnSerializer getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column,
IndexSpec indexSpec
)
{
return SpectatorHistogramSerializer.create(
segmentWriteOutMedium,

View File

@ -21,6 +21,8 @@ package org.apache.druid.spectator.histogram;
import org.apache.druid.segment.column.ComplexColumn;
import javax.annotation.Nullable;
public class SpectatorHistogramIndexBasedComplexColumn implements ComplexColumn
{
private final SpectatorHistogramIndexed index;
@ -45,6 +47,7 @@ public class SpectatorHistogramIndexBasedComplexColumn implements ComplexColumn
}
@Override
@Nullable
public Object getRowValue(int rowNum)
{
return index.get(rowNum);

View File

@ -57,4 +57,10 @@ public class SpectatorHistogramObjectStrategy implements ObjectStrategy<Spectato
{
return SpectatorHistogramAggregatorFactory.COMPARATOR.compare(o1, o2);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
}

View File

@ -267,6 +267,7 @@ public class SpectatorHistogramTest
{
SegmentWriteOutMedium medium = new OnHeapMemorySegmentWriteOutMedium();
SpectatorHistogramObjectStrategy strategy = new SpectatorHistogramObjectStrategy();
Assert.assertFalse(strategy.readRetainsBufferReference());
SpectatorHistogramSerializer serializer = SpectatorHistogramSerializer.create(medium, "test", strategy);
serializer.open();

View File

@ -22,17 +22,9 @@ package org.apache.druid.query.aggregation.tdigestsketch;
import com.tdunning.math.stats.MergingDigest;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
public class TDigestSketchComplexMetricSerde extends ComplexMetricSerde
{
@ -81,31 +73,9 @@ public class TDigestSketchComplexMetricSerde extends ComplexMetricSerde
};
}
@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
final GenericIndexed<MergingDigest> column = GenericIndexed.read(
buffer,
STRATEGY,
builder.getFileMapper()
);
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
@Override
public ObjectStrategy<MergingDigest> getObjectStrategy()
{
return STRATEGY;
}
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(
segmentWriteOutMedium,
column,
this.getObjectStrategy()
);
}
}

View File

@ -60,4 +60,10 @@ public class TDigestSketchObjectStrategy implements ObjectStrategy<MergingDigest
{
return TDigestSketchAggregatorFactory.COMPARATOR.compare(o1, o2);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.tdigestsketch;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
public class TDigestSketchObjectStrategyTest
{
@Test
public void testReadRetainsBufferReference()
{
TDigestSketchObjectStrategy strategy = new TDigestSketchObjectStrategy();
Assert.assertFalse(strategy.readRetainsBufferReference());
}
}

View File

@ -21,18 +21,10 @@ package org.apache.druid.query.aggregation.datasketches.hll;
import org.apache.datasketches.hll.HllSketch;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.data.SafeWritableMemory;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
public class HllSketchMergeComplexMetricSerde extends ComplexMetricSerde
{
@ -74,22 +66,4 @@ public class HllSketchMergeComplexMetricSerde extends ComplexMetricSerde
}
};
}
@Override
public void deserializeColumn(final ByteBuffer buf, final ColumnBuilder columnBuilder)
{
columnBuilder.setComplexColumnSupplier(
new ComplexColumnPartSupplier(
getTypeName(),
GenericIndexed.read(buf, HllSketchHolderObjectStrategy.STRATEGY, columnBuilder.getFileMapper())
)
);
}
@Override
public GenericColumnSerializer getSerializer(final SegmentWriteOutMedium segmentWriteOutMedium, final String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
}

View File

@ -23,17 +23,9 @@ import com.google.common.primitives.Doubles;
import org.apache.datasketches.kll.KllDoublesSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
public class KllDoublesSketchComplexMetricSerde extends ComplexMetricSerde
{
@ -95,18 +87,4 @@ public class KllDoublesSketchComplexMetricSerde extends ComplexMetricSerde
}
};
}
@Override
public void deserializeColumn(final ByteBuffer buffer, final ColumnBuilder builder)
{
final GenericIndexed<KllDoublesSketch> column = GenericIndexed.read(buffer, STRATEGY, builder.getFileMapper());
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
// support large columns
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
}

View File

@ -23,17 +23,9 @@ import com.google.common.primitives.Floats;
import org.apache.datasketches.kll.KllFloatsSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
public class KllFloatsSketchComplexMetricSerde extends ComplexMetricSerde
{
@ -95,18 +87,4 @@ public class KllFloatsSketchComplexMetricSerde extends ComplexMetricSerde
}
};
}
@Override
public void deserializeColumn(final ByteBuffer buffer, final ColumnBuilder builder)
{
final GenericIndexed<KllFloatsSketch> column = GenericIndexed.read(buffer, STRATEGY, builder.getFileMapper());
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
// support large columns
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
}

View File

@ -24,17 +24,9 @@ import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
public class DoublesSketchComplexMetricSerde extends ComplexMetricSerde
{
@ -96,18 +88,4 @@ public class DoublesSketchComplexMetricSerde extends ComplexMetricSerde
}
};
}
@Override
public void deserializeColumn(final ByteBuffer buffer, final ColumnBuilder builder)
{
final GenericIndexed<DoublesSketch> column = GenericIndexed.read(buffer, STRATEGY, builder.getFileMapper());
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
// support large columns
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
}

View File

@ -20,18 +20,11 @@
package org.apache.druid.query.aggregation.datasketches.theta;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class SketchMergeComplexMetricSerde extends ComplexMetricSerde
{
@ -64,23 +57,9 @@ public class SketchMergeComplexMetricSerde extends ComplexMetricSerde
};
}
@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
GenericIndexed<SketchHolder> ge = GenericIndexed.read(buffer, strategy, builder.getFileMapper());
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), ge));
}
@Override
public ObjectStrategy<SketchHolder> getObjectStrategy()
{
return strategy;
}
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
}

View File

@ -21,17 +21,9 @@ package org.apache.druid.query.aggregation.datasketches.tuple;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
public class ArrayOfDoublesSketchMergeComplexMetricSerde extends ComplexMetricSerde
{
@ -65,23 +57,9 @@ public class ArrayOfDoublesSketchMergeComplexMetricSerde extends ComplexMetricSe
};
}
@Override
public void deserializeColumn(final ByteBuffer buffer, final ColumnBuilder builder)
{
final GenericIndexed<ArrayOfDoublesSketch> ge = GenericIndexed.read(buffer, ArrayOfDoublesSketchObjectStrategy.STRATEGY, builder.getFileMapper());
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), ge));
}
@Override
public ObjectStrategy<ArrayOfDoublesSketch> getObjectStrategy()
{
return ArrayOfDoublesSketchObjectStrategy.STRATEGY;
}
@Override
public GenericColumnSerializer getSerializer(final SegmentWriteOutMedium segmentWriteOutMedium, final String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
}

View File

@ -42,6 +42,7 @@ public class HllSketchHolderObjectStrategyTest
ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
HllSketchHolderObjectStrategy objectStrategy = new HllSketchHolderObjectStrategy();
Assert.assertTrue(objectStrategy.readRetainsBufferReference());
// valid sketch should not explode when copied, which reads the memory
objectStrategy.fromByteBufferSafe(buf, bytes.length).getSketch().copy();

View File

@ -102,6 +102,7 @@ public class KllDoublesSketchComplexMetricSerdeTest
{
final KllDoublesSketchComplexMetricSerde serde = new KllDoublesSketchComplexMetricSerde();
final ObjectStrategy<KllDoublesSketch> objectStrategy = serde.getObjectStrategy();
Assert.assertTrue(objectStrategy.readRetainsBufferReference());
KllDoublesSketch sketch = KllDoublesSketch.newHeapInstance();
sketch.update(1.1);

View File

@ -102,6 +102,7 @@ public class KllFloatsSketchComplexMetricSerdeTest
{
final KllFloatsSketchComplexMetricSerde serde = new KllFloatsSketchComplexMetricSerde();
final ObjectStrategy<KllFloatsSketch> objectStrategy = serde.getObjectStrategy();
Assert.assertTrue(objectStrategy.readRetainsBufferReference());
KllFloatsSketch sketch = KllFloatsSketch.newHeapInstance();
sketch.update(1.1f);

View File

@ -108,6 +108,7 @@ public class DoublesSketchComplexMetricSerdeTest
ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
ObjectStrategy<DoublesSketch> objectStrategy = serde.getObjectStrategy();
Assert.assertTrue(objectStrategy.readRetainsBufferReference());
// valid sketch should not explode when copied, which reads the memory
objectStrategy.fromByteBufferSafe(buf, bytes.length).toByteArray(true);

View File

@ -36,6 +36,7 @@ public class SketchHolderObjectStrategyTest
public void testSafeRead()
{
SketchHolderObjectStrategy objectStrategy = new SketchHolderObjectStrategy();
Assert.assertTrue(objectStrategy.readRetainsBufferReference());
Union union = (Union) SetOperation.builder().setNominalEntries(1024).build(Family.UNION);
union.update(1234L);

View File

@ -33,6 +33,7 @@ public class ArrayOfDoublesSketchObjectStrategyTest
public void testSafeRead()
{
ArrayOfDoublesSketchObjectStrategy objectStrategy = new ArrayOfDoublesSketchObjectStrategy();
Assert.assertTrue(objectStrategy.readRetainsBufferReference());
ArrayOfDoublesUpdatableSketch sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(1024)
.setNumberOfValues(4)
.build();

View File

@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation.bloom;
import org.apache.druid.guice.BloomFilterSerializersModule;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
@ -58,7 +59,11 @@ public class BloomFilterSerde extends ComplexMetricSerde
}
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
public GenericColumnSerializer getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column,
IndexSpec indexSpec
)
{
throw new UnsupportedOperationException("Bloom filter aggregators are query-time only");
}

View File

@ -22,15 +22,9 @@ package org.apache.druid.query.aggregation.histogram;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Rows;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
import java.util.Collection;
@ -85,20 +79,6 @@ public class ApproximateHistogramFoldingSerde extends ComplexMetricSerde
};
}
@Override
public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder columnBuilder)
{
final GenericIndexed<ApproximateHistogram> column =
GenericIndexed.read(byteBuffer, getObjectStrategy(), columnBuilder.getFileMapper());
columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
@Override
public ObjectStrategy<ApproximateHistogram> getObjectStrategy()
{
@ -131,6 +111,12 @@ public class ApproximateHistogramFoldingSerde extends ComplexMetricSerde
{
return ApproximateHistogramAggregator.COMPARATOR.compare(o1, o2);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
};
}
}

View File

@ -25,15 +25,9 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Rows;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -131,13 +125,6 @@ public class FixedBucketsHistogramSerde extends ComplexMetricSerde
};
}
@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), builder.getFileMapper());
builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
@Override
public ObjectStrategy getObjectStrategy()
{
@ -172,15 +159,12 @@ public class FixedBucketsHistogramSerde extends ComplexMetricSerde
{
return comparator.compare(o1, o2);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
};
}
@Override
public GenericColumnSerializer getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column
)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.aggregation.histogram;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.junit.Assert;
import org.junit.Test;
@ -75,6 +76,15 @@ public class ApproximateHistogramFoldingSerdeTest
);
}
@Test
public void testReadRetainsBufferReference()
{
final ApproximateHistogramFoldingSerde serde = new ApproximateHistogramFoldingSerde();
final ObjectStrategy<ApproximateHistogram> strategy = serde.getObjectStrategy();
Assert.assertFalse(strategy.readRetainsBufferReference());
}
public static ApproximateHistogram makeHistogram(final float... floats)
{
final ApproximateHistogram histogram = new ApproximateHistogram();

View File

@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation.histogram;
import org.apache.commons.math3.distribution.NormalDistribution;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.data.ObjectStrategy;
import org.junit.Assert;
import org.junit.Test;
@ -1503,4 +1504,12 @@ public class FixedBucketsHistogramTest
FixedBucketsHistogram fromBase64 = FixedBucketsHistogram.fromBase64(asBase64Full);
Assert.assertEquals(hSparse, fromBase64);
}
@Test
public void testObjectStrategyReadRetainsBufferReference()
{
FixedBucketsHistogramSerde serde = new FixedBucketsHistogramSerde();
ObjectStrategy<?> strategy = serde.getObjectStrategy();
Assert.assertFalse(strategy.readRetainsBufferReference());
}
}

View File

@ -2650,7 +2650,7 @@ public class MSQReplaceTest extends MSQTestBase
);
}
IndexSpec indexSpec = new IndexSpec(null, null, null, null, null, null, null);
IndexSpec indexSpec = IndexSpec.DEFAULT;
GranularitySpec granularitySpec = new UniformGranularitySpec(
segmentGranularity.getDefaultGranularity(),
GranularityType.NONE.getDefaultGranularity(),

View File

@ -21,15 +21,9 @@ package org.apache.druid.query.aggregation.variance;
import com.google.common.collect.Ordering;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
import java.util.List;
@ -82,13 +76,6 @@ public class VarianceSerde extends ComplexMetricSerde
};
}
@Override
public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder columnBuilder)
{
final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy(), columnBuilder.getFileMapper());
columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
@Override
public ObjectStrategy getObjectStrategy()
{
@ -118,13 +105,12 @@ public class VarianceSerde extends ComplexMetricSerde
{
return COMPARATOR.compare(o1, o2);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
};
}
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
}

View File

@ -35,6 +35,7 @@ public class VarianceSerdeTest
Random r = ThreadLocalRandom.current();
VarianceAggregatorCollector holder = new VarianceAggregatorCollector();
ObjectStrategy strategy = new VarianceSerde().getObjectStrategy();
Assert.assertFalse(strategy.readRetainsBufferReference());
Assert.assertEquals(VarianceAggregatorCollector.class, strategy.getClazz());
for (int i = 0; i < 100; i++) {

View File

@ -185,6 +185,7 @@ public class ComplexFrameColumnReader implements FrameColumnReader
}
@Override
@Nullable
public Object getRowValue(int rowNum)
{
// Need bounds checking, since getObjectForPhysicalRow doesn't do it.

View File

@ -26,6 +26,7 @@ import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.serde.cell.ByteBufferProvider;
import org.apache.druid.segment.serde.cell.CellReader;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -63,6 +64,7 @@ public class SerializablePairLongDoubleComplexColumn implements ComplexColumn
}
@Override
@Nullable
public Object getRowValue(int rowNum)
{
return serde.deserialize(cellReader.getCell(rowNum));

View File

@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation;
import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
@ -58,7 +59,11 @@ public class SerializablePairLongDoubleComplexMetricSerde extends AbstractSerial
}
@Override
public GenericColumnSerializer<SerializablePairLongDouble> getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
public GenericColumnSerializer<SerializablePairLongDouble> getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column,
IndexSpec indexSpec
)
{
return new SerializablePairLongDoubleColumnSerializer(
segmentWriteOutMedium,
@ -108,6 +113,12 @@ public class SerializablePairLongDoubleComplexMetricSerde extends AbstractSerial
{
return SERDE.serialize(inPair);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
};
}

View File

@ -26,6 +26,7 @@ import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.serde.cell.ByteBufferProvider;
import org.apache.druid.segment.serde.cell.CellReader;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -63,6 +64,7 @@ public class SerializablePairLongFloatComplexColumn implements ComplexColumn
}
@Override
@Nullable
public Object getRowValue(int rowNum)
{
return serde.deserialize(cellReader.getCell(rowNum));

View File

@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation;
import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
@ -58,7 +59,11 @@ public class SerializablePairLongFloatComplexMetricSerde extends AbstractSeriali
}
@Override
public GenericColumnSerializer<SerializablePairLongFloat> getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
public GenericColumnSerializer<SerializablePairLongFloat> getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column,
IndexSpec indexSpec
)
{
return new SerializablePairLongFloatColumnSerializer(
segmentWriteOutMedium,
@ -109,6 +114,12 @@ public class SerializablePairLongFloatComplexMetricSerde extends AbstractSeriali
{
return SERDE.serialize(inPair);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
};
}

View File

@ -26,6 +26,7 @@ import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.serde.cell.ByteBufferProvider;
import org.apache.druid.segment.serde.cell.CellReader;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -63,6 +64,7 @@ public class SerializablePairLongLongComplexColumn implements ComplexColumn
}
@Override
@Nullable
public Object getRowValue(int rowNum)
{
return serde.deserialize(cellReader.getCell(rowNum));

View File

@ -22,6 +22,7 @@ package org.apache.druid.query.aggregation;
import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
@ -58,7 +59,11 @@ public class SerializablePairLongLongComplexMetricSerde extends AbstractSerializ
}
@Override
public GenericColumnSerializer<SerializablePairLongLong> getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
public GenericColumnSerializer<SerializablePairLongLong> getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column,
IndexSpec indexSpec
)
{
return new SerializablePairLongLongColumnSerializer(
segmentWriteOutMedium,
@ -108,6 +113,12 @@ public class SerializablePairLongLongComplexMetricSerde extends AbstractSerializ
{
return SERDE.serialize(inPair);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
};
}

View File

@ -26,6 +26,7 @@ import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.serde.cell.ByteBufferProvider;
import org.apache.druid.segment.serde.cell.CellReader;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -64,6 +65,7 @@ public class SerializablePairLongStringComplexColumn implements ComplexColumn
@SuppressWarnings("ConstantConditions")
@Override
@Nullable
public Object getRowValue(int rowNum)
{
// This can return nulls, meaning that it is expected that anything reading from this does

View File

@ -24,6 +24,7 @@ import org.apache.druid.collections.SerializablePair;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
@ -193,7 +194,11 @@ public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricS
}
@Override
public GenericColumnSerializer<?> getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
public GenericColumnSerializer<?> getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column,
IndexSpec indexSpec
)
{
if (compressionEnabled) {
return new SerializablePairLongStringColumnSerializer(
@ -263,5 +268,11 @@ public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricS
return bbuf.array();
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
};
}

View File

@ -22,15 +22,9 @@ package org.apache.druid.query.aggregation.hyperloglog;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.hll.HyperLogLogHash;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
import java.util.Comparator;
@ -96,13 +90,6 @@ public class HyperUniquesSerde extends ComplexMetricSerde
};
}
@Override
public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder columnBuilder)
{
final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy(), columnBuilder.getFileMapper());
columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
}
@Override
public ObjectStrategy getObjectStrategy()
{
@ -143,11 +130,4 @@ public class HyperUniquesSerde extends ComplexMetricSerde
}
};
}
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
}
}

View File

@ -732,7 +732,7 @@ public class IndexMergerV9 implements IndexMerger
if (serde == null) {
throw new ISE("Unknown type[%s]", type.getComplexTypeName());
}
writer = serde.getSerializer(segmentWriteOutMedium, metric);
writer = serde.getSerializer(segmentWriteOutMedium, metric, indexSpec);
break;
default:
throw new ISE("Unknown type[%s]", type);

View File

@ -55,7 +55,8 @@ public class IndexSpec
private final StringEncodingStrategy stringDictionaryEncoding;
private final CompressionStrategy metricCompression;
private final CompressionFactory.LongEncodingStrategy longEncoding;
@Nullable
private final CompressionStrategy complexMetricCompression;
@Nullable
private final CompressionStrategy jsonCompression;
@Nullable
@ -84,6 +85,7 @@ public class IndexSpec
@JsonProperty("stringDictionaryEncoding") @Nullable StringEncodingStrategy stringDictionaryEncoding,
@JsonProperty("metricCompression") @Nullable CompressionStrategy metricCompression,
@JsonProperty("longEncoding") @Nullable CompressionFactory.LongEncodingStrategy longEncoding,
@JsonProperty("complexMetricCompression") @Nullable CompressionStrategy complexMetricCompression,
@JsonProperty("jsonCompression") @Nullable CompressionStrategy jsonCompression,
@JsonProperty("segmentLoader") @Nullable SegmentizerFactory segmentLoader
)
@ -101,6 +103,7 @@ public class IndexSpec
this.metricCompression = metricCompression == null
? CompressionStrategy.DEFAULT_COMPRESSION_STRATEGY
: metricCompression;
this.complexMetricCompression = complexMetricCompression;
this.longEncoding = longEncoding == null
? CompressionFactory.DEFAULT_LONG_ENCODING_STRATEGY
: longEncoding;
@ -138,6 +141,14 @@ public class IndexSpec
return longEncoding;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public CompressionStrategy getComplexMetricCompression()
{
return complexMetricCompression;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
@ -179,6 +190,7 @@ public class IndexSpec
Objects.equals(stringDictionaryEncoding, indexSpec.stringDictionaryEncoding) &&
metricCompression == indexSpec.metricCompression &&
longEncoding == indexSpec.longEncoding &&
Objects.equals(complexMetricCompression, indexSpec.complexMetricCompression) &&
Objects.equals(jsonCompression, indexSpec.jsonCompression) &&
Objects.equals(segmentLoader, indexSpec.segmentLoader);
}
@ -192,6 +204,7 @@ public class IndexSpec
stringDictionaryEncoding,
metricCompression,
longEncoding,
complexMetricCompression,
jsonCompression,
segmentLoader
);
@ -206,6 +219,7 @@ public class IndexSpec
", stringDictionaryEncoding=" + stringDictionaryEncoding +
", metricCompression=" + metricCompression +
", longEncoding=" + longEncoding +
", complexMetricCompression=" + complexMetricCompression +
", jsonCompression=" + jsonCompression +
", segmentLoader=" + segmentLoader +
'}';
@ -224,6 +238,8 @@ public class IndexSpec
@Nullable
private CompressionFactory.LongEncodingStrategy longEncoding;
@Nullable
private CompressionStrategy complexMetricCompression;
@Nullable
private CompressionStrategy jsonCompression;
@Nullable
private SegmentizerFactory segmentLoader;
@ -252,6 +268,12 @@ public class IndexSpec
return this;
}
public Builder withComplexMetricCompression(CompressionStrategy complexMetricCompression)
{
this.complexMetricCompression = complexMetricCompression;
return this;
}
public Builder withLongEncoding(CompressionFactory.LongEncodingStrategy longEncoding)
{
this.longEncoding = longEncoding;
@ -278,6 +300,7 @@ public class IndexSpec
stringDictionaryEncoding,
metricCompression,
longEncoding,
complexMetricCompression,
jsonCompression,
segmentLoader
);

View File

@ -52,9 +52,11 @@ public interface ComplexColumn extends BaseColumn
/**
* Return rows in the column.
*
* @param rowNum the row number
* @return row object of type same as {@link ComplexColumn#getClazz()} } at row number "rowNum" .
*/
@Nullable
Object getRowValue(int rowNum);
/**

View File

@ -21,6 +21,8 @@ package org.apache.druid.segment.column;
import org.apache.druid.segment.data.GenericIndexed;
import javax.annotation.Nullable;
/**
* Implementation of {@link ComplexColumn} to be used when complex column serialization is done by using
* {@link GenericIndexed} by using default implementations of "writeToXXX" methods in
@ -50,6 +52,7 @@ public class GenericIndexedBasedComplexColumn implements ComplexColumn
}
@Override
@Nullable
public Object getRowValue(int rowNum)
{
return index.get(rowNum);

View File

@ -106,9 +106,13 @@ public final class NullableTypeStrategy<T> implements Comparator<T>, Hash.Strate
}
/**
* Whether the {@link #read} methods return an object that may retain a reference to the provided {@link ByteBuffer}.
* If a reference is sometimes retained, this method returns true. It returns false if, and only if, a reference
* is *never* retained.
* Whether the {@link #read} methods return an object that may retain a reference to the underlying memory of the
* provided {@link ByteBuffer}. If a reference is sometimes retained, this method returns true. It returns false if,
* and only if, a reference is *never* retained.
* <p>
* If this method returns true, and the caller does not control the lifecycle of the underlying memory or cannot
* ensure that it will not change over the lifetime of the returned object, callers should copy the memory to a new
* location that they do control the lifecycle of and will be available for the duration of the returned object.
*/
public boolean readRetainsBufferReference()
{

View File

@ -82,8 +82,7 @@ public class ObjectStrategyComplexTypeStrategy<T> implements TypeStrategy<T>
@Override
public boolean readRetainsBufferReference()
{
// Can't guarantee that ObjectStrategy *doesn't* retain a reference.
return true;
return objectStrategy.readRetainsBufferReference();
}
@Override

View File

@ -87,9 +87,13 @@ public interface TypeStrategy<T> extends Comparator<Object>, Hash.Strategy<T>
T read(ByteBuffer buffer);
/**
* Whether the {@link #read} methods return an object that may retain a reference to the provided {@link ByteBuffer}.
* If a reference is sometimes retained, this method returns true. It returns false if, and only if, a reference
* is *never* retained.
* Whether the {@link #read} methods return an object that may retain a reference to the underlying memory of the
* provided {@link ByteBuffer}. If a reference is sometimes retained, this method returns true. It returns false if,
* and only if, a reference is *never* retained.
* <p>
* If this method returns true, and the caller does not control the lifecycle of the underlying memory or cannot
* ensure that it will not change over the lifetime of the returned object, callers should copy the memory to a new
* location that they do control the lifecycle of and will be available for the duration of the returned object.
*/
boolean readRetainsBufferReference();

View File

@ -49,8 +49,8 @@ public class UnknownTypeComplexColumn implements ComplexColumn
return "UNKNOWN_COMPLEX_COLUMN_TYPE";
}
@Nullable
@Override
@Nullable
public Object getRowValue(int rowNum)
{
return null;

View File

@ -56,7 +56,11 @@ public final class CompressedBlockReader implements Closeable
private static final ByteBuffer NULL_VALUE = ByteBuffer.wrap(new byte[0]);
public static final byte VERSION = 0x01;
public static Supplier<CompressedBlockReader> fromByteBuffer(ByteBuffer buffer, ByteOrder byteOrder)
public static Supplier<CompressedBlockReader> fromByteBuffer(
ByteBuffer buffer,
ByteOrder byteOrder,
boolean copyValuesOnRead
)
{
byte versionFromBuffer = buffer.get();
@ -87,6 +91,7 @@ public final class CompressedBlockReader implements Closeable
compression,
numBlocks,
blockSize,
copyValuesOnRead,
offsetView.asReadOnlyBuffer(),
compressedDataView.asReadOnlyBuffer().order(byteOrder),
byteOrder
@ -97,6 +102,7 @@ public final class CompressedBlockReader implements Closeable
private final CompressionStrategy.Decompressor decompressor;
private final boolean copyValuesOnRead;
private final int numBlocks;
private final int div;
private final int rem;
@ -114,12 +120,14 @@ public final class CompressedBlockReader implements Closeable
CompressionStrategy compressionStrategy,
int numBlocks,
int blockSize,
boolean copyValuesOnRead,
IntBuffer endOffsetsBuffer,
ByteBuffer compressedDataBuffer,
ByteOrder byteOrder
)
{
this.decompressor = compressionStrategy.getDecompressor();
this.copyValuesOnRead = copyValuesOnRead;
this.numBlocks = numBlocks;
this.div = Integer.numberOfTrailingZeros(blockSize);
this.rem = blockSize - 1;
@ -169,18 +177,25 @@ public final class CompressedBlockReader implements Closeable
if (size == 0) {
return NULL_VALUE;
}
final int startBlockOffset = loadBlock(startOffset);
final int startBlockNumber = currentBlockNumber;
decompressedDataBuffer.position(startBlockOffset);
// patch together value from n underlying compressed pages
// possibly patch together value from n underlying compressed pages
if (size < decompressedDataBuffer.remaining()) {
// sweet, same buffer, we can slice out a view directly to the value
final ByteBuffer dupe = decompressedDataBuffer.duplicate().order(byteOrder);
dupe.position(startBlockOffset).limit(startBlockOffset + size);
return dupe.slice().order(byteOrder);
// sweet, same buffer
if (copyValuesOnRead) {
// caller specified copyValuesOnRead, so copy the memory to a heap byte array
final byte[] bytes = new byte[size];
decompressedDataBuffer.get(bytes, 0, size);
return ByteBuffer.wrap(bytes).order(byteOrder);
} else {
// if we don't need to copy, we can return the buffer directly with position and limit set
final ByteBuffer dupe = decompressedDataBuffer.duplicate().order(byteOrder);
dupe.position(startBlockOffset).limit(startBlockOffset + size);
return dupe;
}
} else {
// spans multiple blocks, copy on heap
// spans multiple blocks, always copy on heap
final byte[] bytes = new byte[size];
int bytesRead = 0;
int block = startBlockNumber;

View File

@ -29,7 +29,7 @@ public final class CompressedLongsReader implements ColumnarLongs
{
public static Supplier<CompressedLongsReader> fromByteBuffer(ByteBuffer buffer, ByteOrder order)
{
final Supplier<CompressedBlockReader> baseReader = CompressedBlockReader.fromByteBuffer(buffer, order);
final Supplier<CompressedBlockReader> baseReader = CompressedBlockReader.fromByteBuffer(buffer, order, false);
return () -> new CompressedLongsReader(baseReader.get());
}

View File

@ -37,6 +37,17 @@ public class CompressedVariableSizedBlobColumnSupplier implements Supplier<Compr
ByteOrder order,
SmooshedFileMapper mapper
) throws IOException
{
return fromByteBuffer(filenameBase, buffer, order, false, mapper);
}
public static CompressedVariableSizedBlobColumnSupplier fromByteBuffer(
String filenameBase,
ByteBuffer buffer,
ByteOrder order,
boolean copyValuesOnRead,
SmooshedFileMapper mapper
) throws IOException
{
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == VERSION) {
@ -48,7 +59,7 @@ public class CompressedVariableSizedBlobColumnSupplier implements Supplier<Compr
final ByteBuffer dataBuffer = mapper.mapFile(
CompressedVariableSizedBlobColumnSerializer.getCompressedBlobsFileName(filenameBase)
);
return new CompressedVariableSizedBlobColumnSupplier(offsetsBuffer, dataBuffer, order, numElements);
return new CompressedVariableSizedBlobColumnSupplier(offsetsBuffer, dataBuffer, order, numElements, copyValuesOnRead);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
@ -58,16 +69,17 @@ public class CompressedVariableSizedBlobColumnSupplier implements Supplier<Compr
private final Supplier<CompressedLongsReader> offsetReaderSupplier;
private final Supplier<CompressedBlockReader> blockDataReaderSupplier;
public CompressedVariableSizedBlobColumnSupplier(
private CompressedVariableSizedBlobColumnSupplier(
ByteBuffer offsetsBuffer,
ByteBuffer dataBuffer,
ByteOrder order,
int numElements
int numElements,
boolean copyValuesOnRead
)
{
this.numElements = numElements;
this.offsetReaderSupplier = CompressedLongsReader.fromByteBuffer(offsetsBuffer, order);
this.blockDataReaderSupplier = CompressedBlockReader.fromByteBuffer(dataBuffer, order);
this.blockDataReaderSupplier = CompressedBlockReader.fromByteBuffer(dataBuffer, order, copyValuesOnRead);
}
@Override

View File

@ -167,6 +167,12 @@ public abstract class GenericIndexed<T> implements CloseableIndexed<T>, Serializ
{
return Comparators.<String>naturalNullsFirst().compare(o1, o2);
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
};
public static <T> GenericIndexed<T> read(ByteBuffer buffer, ObjectStrategy<T> strategy)

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.data;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.writeout.WriteOutBytes;
import javax.annotation.Nullable;
@ -59,6 +60,23 @@ public interface ObjectStrategy<T> extends Comparator<T>
return true;
}
/**
* Whether the {@link #fromByteBuffer(ByteBuffer, int)}, {@link #fromByteBufferWithSize(ByteBuffer)}, and
* {@link #fromByteBufferSafe(ByteBuffer, int)} methods return an object that may retain a reference to the underlying
* memory provided by a {@link ByteBuffer}. If a reference is sometimes retained, this method returns true. It
* returns false if, and only if, a reference is *never* retained.
* <p>
* If this method returns true, and the caller does not control the lifecycle of the underlying memory or cannot
* ensure that it will not change over the lifetime of the returned object, callers should copy the memory to a new
* location that they do control the lifecycle of and will be available for the duration of the returned object.
*
* @see TypeStrategy#readRetainsBufferReference()
*/
default boolean readRetainsBufferReference()
{
return true;
}
/**
* Reads 4-bytes numBytes from the given buffer, and then delegates to {@link #fromByteBuffer(ByteBuffer, int)}.
*/

View File

@ -43,6 +43,20 @@ public class GeneratorBasicSchemas
{
private static final ImmutableMap.Builder<String, GeneratorSchemaInfo> SCHEMA_INFO_BUILDER = ImmutableMap.builder();
public static final String BASIC_SCHEMA = "basic";
public static final String BASIC_SCHEMA_EXPRESSION = "expression";
public static final String SIMPLE_SCHEMA = "simple";
public static final String SIMPLE_LONG_SCHEMA = "simpleLong";
public static final String SIMPLE_FLOAT_SCHEMA = "simpleFloat";
public static final String SIMPLE_NULLS_SCHEMA = "nulls";
public static final String ROLLUP_SCHEMA = "rollo";
public static final String NULLABLE_TEST_SCHEMA = "nullable";
public static final String EXPRESSION_TESTBENCH_SCHEMA = "expression-testbench";
public static final String NESTED_TESTBENCH_SCHEMA = "nested";
public static final String GROUPBY_TESTBENCH_SCHEMA = "groupBy-testbench";
public static final String IN_TESTBENCH_SCHEMA = "in-testbench";
public static final String WIDE_SCHEMA = "wide";
static {
// basic schema
List<GeneratorColumnSchema> basicSchemaColumns = ImmutableList.of(
@ -122,8 +136,8 @@ public class GeneratorBasicSchemas
true
);
SCHEMA_INFO_BUILDER.put("basic", basicSchema);
SCHEMA_INFO_BUILDER.put("expression", basicSchemaExpression);
SCHEMA_INFO_BUILDER.put(BASIC_SCHEMA, basicSchema);
SCHEMA_INFO_BUILDER.put(BASIC_SCHEMA_EXPRESSION, basicSchemaExpression);
}
static { // simple single string column and count agg schema, no rollup
@ -143,7 +157,7 @@ public class GeneratorBasicSchemas
basicSchemaDataInterval,
false
);
SCHEMA_INFO_BUILDER.put("simple", basicSchema);
SCHEMA_INFO_BUILDER.put(SIMPLE_SCHEMA, basicSchema);
}
static {
@ -165,7 +179,7 @@ public class GeneratorBasicSchemas
basicSchemaDataInterval,
false
);
SCHEMA_INFO_BUILDER.put("simpleLong", basicSchema);
SCHEMA_INFO_BUILDER.put(SIMPLE_LONG_SCHEMA, basicSchema);
}
static {
@ -187,7 +201,7 @@ public class GeneratorBasicSchemas
basicSchemaDataInterval,
false
);
SCHEMA_INFO_BUILDER.put("simpleFloat", basicSchema);
SCHEMA_INFO_BUILDER.put(SIMPLE_FLOAT_SCHEMA, basicSchema);
}
static {
@ -238,7 +252,7 @@ public class GeneratorBasicSchemas
basicSchemaDataInterval,
true
);
SCHEMA_INFO_BUILDER.put("rollo", rolloSchema);
SCHEMA_INFO_BUILDER.put(ROLLUP_SCHEMA, rolloSchema);
}
static {
@ -268,7 +282,7 @@ public class GeneratorBasicSchemas
false
);
SCHEMA_INFO_BUILDER.put("nulls", nullsSchema);
SCHEMA_INFO_BUILDER.put(SIMPLE_NULLS_SCHEMA, nullsSchema);
}
static {
@ -309,7 +323,7 @@ public class GeneratorBasicSchemas
false
);
SCHEMA_INFO_BUILDER.put("nulls-and-non-nulls", nullsSchema);
SCHEMA_INFO_BUILDER.put(NULLABLE_TEST_SCHEMA, nullsSchema);
}
static {
@ -362,7 +376,8 @@ public class GeneratorBasicSchemas
false
);
SCHEMA_INFO_BUILDER.put("expression-testbench", expressionsTestsSchema);
SCHEMA_INFO_BUILDER.put(EXPRESSION_TESTBENCH_SCHEMA, expressionsTestsSchema);
SCHEMA_INFO_BUILDER.put(NESTED_TESTBENCH_SCHEMA, expressionsTestsSchema);
}
static {
@ -406,7 +421,7 @@ public class GeneratorBasicSchemas
false
);
SCHEMA_INFO_BUILDER.put("groupBy-testbench", groupByTestsSchema);
SCHEMA_INFO_BUILDER.put(GROUPBY_TESTBENCH_SCHEMA, groupByTestsSchema);
}
static {
@ -426,7 +441,7 @@ public class GeneratorBasicSchemas
false
);
SCHEMA_INFO_BUILDER.put("in-testbench", expressionsTestsSchema);
SCHEMA_INFO_BUILDER.put(IN_TESTBENCH_SCHEMA, expressionsTestsSchema);
}
@ -532,7 +547,7 @@ public class GeneratorBasicSchemas
false
);
SCHEMA_INFO_BUILDER.put("wide", nullsSchema);
SCHEMA_INFO_BUILDER.put(WIDE_SCHEMA, nullsSchema);
}
public static final Map<String, GeneratorSchemaInfo> SCHEMA_MAP = SCHEMA_INFO_BUILDER.build();

View File

@ -310,8 +310,8 @@ public abstract class CompressedNestedDataComplexColumn<TStringDictionary extend
return nullValues;
}
@Nullable
@Override
@Nullable
public Object getRowValue(int rowNum)
{
if (nullValues.get(rowNum)) {

View File

@ -34,6 +34,7 @@ import org.apache.druid.segment.data.DictionaryWriter;
import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.FrontCodedIntArrayIndexed;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import javax.annotation.Nullable;
import java.io.Closeable;
@ -227,9 +228,9 @@ public final class DictionaryIdLookup implements Closeable
// mapper so that we can have a mutable smoosh)
File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath, StringUtils.urlEncode(name) + "__stringTempSmoosh");
stringDictionaryFile = stringSmoosh.toPath();
final String fileName = NestedCommonFormatColumnSerializer.getInternalFileName(
final String fileName = ColumnSerializerUtils.getInternalFileName(
name,
NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME
);
try (
@ -259,7 +260,7 @@ public final class DictionaryIdLookup implements Closeable
private void ensureLongDictionaryLoaded()
{
if (longDictionary == null) {
longDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME);
longDictionaryFile = makeTempFile(name + ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter);
longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES).get();
// reset position
@ -270,7 +271,7 @@ public final class DictionaryIdLookup implements Closeable
private void ensureDoubleDictionaryLoaded()
{
if (doubleDictionary == null) {
doubleDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME);
doubleDictionaryFile = makeTempFile(name + ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter);
doubleDictionary = FixedIndexed.read(
doubleBuffer,
@ -286,7 +287,7 @@ public final class DictionaryIdLookup implements Closeable
private void ensureArrayDictionaryLoaded()
{
if (arrayDictionary == null && arrayDictionaryWriter != null) {
arrayDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME);
arrayDictionaryFile = makeTempFile(name + ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter);
arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer, ByteOrder.nativeOrder()).get();
// reset position

View File

@ -40,6 +40,7 @@ import org.apache.druid.segment.data.FixedIndexedIntWriter;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
@ -293,7 +294,7 @@ public abstract class GlobalDictionaryEncodedFieldColumnWriter<T>
}
}
};
final String fieldFileName = NestedCommonFormatColumnSerializer.getInternalFileName(columnName, fieldName);
final String fieldFileName = ColumnSerializerUtils.getInternalFileName(columnName, fieldName);
final long size = fieldSerializer.getSerializedSize();
log.debug("Column [%s] serializing [%s] field of size [%d].", columnName, fieldName, size);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(fieldFileName, size)) {

View File

@ -21,9 +21,9 @@ package org.apache.druid.segment.nested;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.data.VByte;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.Serializer;
import java.io.IOException;
@ -50,18 +50,7 @@ import java.util.SortedMap;
public abstract class NestedCommonFormatColumnSerializer implements GenericColumnSerializer<StructuredData>
{
public static final byte V0 = 0x00;
public static final String STRING_DICTIONARY_FILE_NAME = "__stringDictionary";
public static final String LONG_DICTIONARY_FILE_NAME = "__longDictionary";
public static final String DOUBLE_DICTIONARY_FILE_NAME = "__doubleDictionary";
public static final String ARRAY_DICTIONARY_FILE_NAME = "__arrayDictionary";
public static final String ARRAY_ELEMENT_DICTIONARY_FILE_NAME = "__arrayElementDictionary";
public static final String ENCODED_VALUE_COLUMN_FILE_NAME = "__encodedColumn";
public static final String LONG_VALUE_COLUMN_FILE_NAME = "__longColumn";
public static final String DOUBLE_VALUE_COLUMN_FILE_NAME = "__doubleColumn";
public static final String BITMAP_INDEX_FILE_NAME = "__valueIndexes";
public static final String ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME = "__arrayElementIndexes";
public static final String RAW_FILE_NAME = "__raw";
public static final String NULL_BITMAP_FILE_NAME = "__nullIndex";
public static final String NESTED_FIELD_PREFIX = "__field_";
public abstract void openDictionaryWriter() throws IOException;
@ -86,18 +75,12 @@ public abstract class NestedCommonFormatColumnSerializer implements GenericColum
protected void writeInternal(FileSmoosher smoosher, Serializer serializer, String fileName) throws IOException
{
final String internalName = getInternalFileName(getColumnName(), fileName);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, serializer.getSerializedSize())) {
serializer.writeTo(smooshChannel, smoosher);
}
ColumnSerializerUtils.writeInternal(smoosher, serializer, getColumnName(), fileName);
}
protected void writeInternal(FileSmoosher smoosher, ByteBuffer buffer, String fileName) throws IOException
{
final String internalName = getInternalFileName(getColumnName(), fileName);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, buffer.capacity())) {
smooshChannel.write(buffer);
}
ColumnSerializerUtils.writeInternal(smoosher, buffer, getColumnName(), fileName);
}
protected void writeV0Header(WritableByteChannel channel, ByteBuffer columnNameBuffer) throws IOException
@ -108,8 +91,7 @@ public abstract class NestedCommonFormatColumnSerializer implements GenericColum
protected ByteBuffer computeFilenameBytes()
{
final String columnName = getColumnName();
final byte[] bytes = StringUtils.toUtf8(columnName);
final byte[] bytes = StringUtils.toUtf8(getColumnName());
final int length = VByte.computeIntSize(bytes.length);
final ByteBuffer buffer = ByteBuffer.allocate(length + bytes.length).order(ByteOrder.nativeOrder());
VByte.writeInt(buffer, bytes.length);
@ -117,12 +99,4 @@ public abstract class NestedCommonFormatColumnSerializer implements GenericColum
buffer.flip();
return buffer;
}
/**
* Nested field columns are stored in separate
*/
public static String getInternalFileName(String fileNameBase, String field)
{
return fileNameBase + "." + field;
}
}

View File

@ -34,7 +34,6 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.StringEncodingStrategies;
@ -48,10 +47,10 @@ import org.apache.druid.segment.data.FixedIndexedWriter;
import org.apache.druid.segment.data.FrontCodedIntArrayIndexedWriter;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -248,7 +247,7 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
public void open() throws IOException
{
rawWriter = new CompressedVariableSizedBlobColumnSerializer(
getInternalFileName(name, RAW_FILE_NAME),
ColumnSerializerUtils.getInternalFileName(name, RAW_FILE_NAME),
segmentWriteOutMedium,
indexSpec.getJsonCompression() != null ? indexSpec.getJsonCompression() : CompressionStrategy.LZ4
);
@ -394,18 +393,6 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
{
if (!closedForWrite) {
closedForWrite = true;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IndexMerger.SERIALIZER_UTILS.writeString(
baos,
NestedDataComplexTypeSerde.OBJECT_MAPPER.writeValueAsString(
new NestedDataColumnMetadata(
ByteOrder.nativeOrder(),
indexSpec.getBitmapSerdeFactory(),
name,
!nullRowsBitmap.isEmpty()
)
)
);
nullBitmapWriter.write(nullRowsBitmap);
columnNameBytes = computeFilenameBytes();
}
@ -447,26 +434,26 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
smoosher.add(internalName, fileMapper.mapFile(internalName));
}
} else {
writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME);
writeInternal(smoosher, dictionaryWriter, ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
}
if (globalDictionaryIdLookup.getLongBuffer() != null) {
writeInternal(smoosher, globalDictionaryIdLookup.getLongBuffer(), LONG_DICTIONARY_FILE_NAME);
writeInternal(smoosher, globalDictionaryIdLookup.getLongBuffer(), ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
} else {
writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME);
writeInternal(smoosher, longDictionaryWriter, ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
}
if (globalDictionaryIdLookup.getDoubleBuffer() != null) {
writeInternal(smoosher, globalDictionaryIdLookup.getDoubleBuffer(), DOUBLE_DICTIONARY_FILE_NAME);
writeInternal(smoosher, globalDictionaryIdLookup.getDoubleBuffer(), ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
} else {
writeInternal(smoosher, doubleDictionaryWriter, DOUBLE_DICTIONARY_FILE_NAME);
writeInternal(smoosher, doubleDictionaryWriter, ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
}
if (globalDictionaryIdLookup.getArrayBuffer() != null) {
writeInternal(smoosher, globalDictionaryIdLookup.getArrayBuffer(), ARRAY_DICTIONARY_FILE_NAME);
writeInternal(smoosher, globalDictionaryIdLookup.getArrayBuffer(), ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
} else {
writeInternal(smoosher, arrayDictionaryWriter, ARRAY_DICTIONARY_FILE_NAME);
writeInternal(smoosher, arrayDictionaryWriter, ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
}
writeInternal(smoosher, rawWriter, RAW_FILE_NAME);
if (!nullRowsBitmap.isEmpty()) {
writeInternal(smoosher, nullBitmapWriter, NULL_BITMAP_FILE_NAME);
writeInternal(smoosher, nullBitmapWriter, ColumnSerializerUtils.NULL_BITMAP_FILE_NAME);
}

View File

@ -47,6 +47,8 @@ import org.apache.druid.segment.data.DictionaryWriter;
import org.apache.druid.segment.data.FixedIndexedWriter;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.ComplexColumnMetadata;
import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
@ -181,7 +183,7 @@ public class NestedDataColumnSerializerV4 implements GenericColumnSerializer<Str
doubleDictionaryWriter.open();
rawWriter = new CompressedVariableSizedBlobColumnSerializer(
NestedCommonFormatColumnSerializer.getInternalFileName(name, RAW_FILE_NAME),
ColumnSerializerUtils.getInternalFileName(name, RAW_FILE_NAME),
segmentWriteOutMedium,
indexSpec.getJsonCompression() != null ? indexSpec.getJsonCompression() : CompressionStrategy.LZ4
);
@ -321,8 +323,8 @@ public class NestedDataColumnSerializerV4 implements GenericColumnSerializer<Str
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IndexMerger.SERIALIZER_UTILS.writeString(
baos,
NestedDataComplexTypeSerde.OBJECT_MAPPER.writeValueAsString(
new NestedDataColumnMetadata(
ColumnSerializerUtils.SMILE_MAPPER.writeValueAsString(
new ComplexColumnMetadata(
ByteOrder.nativeOrder(),
indexSpec.getBitmapSerdeFactory(),
name,
@ -389,7 +391,7 @@ public class NestedDataColumnSerializerV4 implements GenericColumnSerializer<Str
private void writeInternal(FileSmoosher smoosher, Serializer serializer, String fileName) throws IOException
{
final String internalName = NestedCommonFormatColumnSerializer.getInternalFileName(name, fileName);
final String internalName = ColumnSerializerUtils.getInternalFileName(name, fileName);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, serializer.getSerializedSize())) {
serializer.writeTo(smooshChannel, smoosher);
}

View File

@ -38,6 +38,7 @@ import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.VByte;
import org.apache.druid.segment.index.SimpleImmutableBitmapIndex;
import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
import javax.annotation.Nullable;
@ -79,7 +80,7 @@ public class NestedDataColumnSupplier implements Supplier<NestedCommonFormatColu
final ByteBuffer stringDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME
);
stringDictionarySupplier = StringEncodingStrategies.getStringDictionarySupplier(
@ -91,7 +92,7 @@ public class NestedDataColumnSupplier implements Supplier<NestedCommonFormatColu
final ByteBuffer longDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
);
longDictionarySupplier = FixedIndexed.read(
longDictionaryBuffer,
@ -102,7 +103,7 @@ public class NestedDataColumnSupplier implements Supplier<NestedCommonFormatColu
final ByteBuffer doubleDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
);
doubleDictionarySupplier = FixedIndexed.read(
doubleDictionaryBuffer,
@ -113,7 +114,7 @@ public class NestedDataColumnSupplier implements Supplier<NestedCommonFormatColu
final ByteBuffer arrayDictionarybuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
);
arrayDictionarySupplier = FrontCodedIntArrayIndexed.read(
arrayDictionarybuffer,
@ -125,7 +126,7 @@ public class NestedDataColumnSupplier implements Supplier<NestedCommonFormatColu
NestedCommonFormatColumnSerializer.RAW_FILE_NAME
);
compressedRawColumnSupplier = CompressedVariableSizedBlobColumnSupplier.fromByteBuffer(
NestedCommonFormatColumnSerializer.getInternalFileName(
ColumnSerializerUtils.getInternalFileName(
columnName,
NestedCommonFormatColumnSerializer.RAW_FILE_NAME
),
@ -138,7 +139,7 @@ public class NestedDataColumnSupplier implements Supplier<NestedCommonFormatColu
final ByteBuffer nullIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.NULL_BITMAP_FILE_NAME
ColumnSerializerUtils.NULL_BITMAP_FILE_NAME
);
nullValues = bitmapSerdeFactory.getObjectStrategy().fromByteBufferWithSize(nullIndexBuffer);
} else {

View File

@ -37,6 +37,8 @@ import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.FrontCodedIntArrayIndexed;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.ComplexColumnMetadata;
import javax.annotation.Nullable;
import java.io.IOException;
@ -77,7 +79,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
if (version == 0x03 || version == 0x04 || version == 0x05) {
try {
final SmooshedFileMapper mapper = columnBuilder.getFileMapper();
final NestedDataColumnMetadata metadata;
final ComplexColumnMetadata metadata;
final GenericIndexed<String> fields;
final FieldTypeInfo fieldInfo;
final CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier;
@ -91,7 +93,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
metadata = jsonMapper.readValue(
IndexMerger.SERIALIZER_UTILS.readString(bb),
NestedDataColumnMetadata.class
ComplexColumnMetadata.class
);
fields = GenericIndexed.read(bb, GenericIndexed.STRING_STRATEGY, mapper);
fieldInfo = FieldTypeInfo.read(bb, fields.size());
@ -112,7 +114,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
final ByteBuffer stringDictionaryBuffer = loadInternalFile(
mapper,
metadata,
NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME
);
stringDictionarySupplier = StringEncodingStrategies.getStringDictionarySupplier(
@ -123,7 +125,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
final ByteBuffer longDictionaryBuffer = loadInternalFile(
mapper,
metadata,
NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
);
longDictionarySupplier = FixedIndexed.read(
longDictionaryBuffer,
@ -134,7 +136,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
final ByteBuffer doubleDictionaryBuffer = loadInternalFile(
mapper,
metadata,
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
);
doubleDictionarySupplier = FixedIndexed.read(
doubleDictionaryBuffer,
@ -146,7 +148,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
final ByteBuffer arrayDictionarybuffer = loadInternalFile(
mapper,
metadata,
NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
);
arrayDictionarySupplier = FrontCodedIntArrayIndexed.read(
arrayDictionarybuffer,
@ -157,7 +159,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
}
final ByteBuffer rawBuffer = loadInternalFile(mapper, metadata, NestedCommonFormatColumnSerializer.RAW_FILE_NAME);
compressedRawColumnSupplier = CompressedVariableSizedBlobColumnSupplier.fromByteBuffer(
NestedCommonFormatColumnSerializer.getInternalFileName(
ColumnSerializerUtils.getInternalFileName(
metadata.getFileNameBase(), NestedCommonFormatColumnSerializer.RAW_FILE_NAME
),
rawBuffer,
@ -169,7 +171,7 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
final ByteBuffer nullIndexBuffer = loadInternalFile(
mapper,
metadata,
NestedCommonFormatColumnSerializer.NULL_BITMAP_FILE_NAME
ColumnSerializerUtils.NULL_BITMAP_FILE_NAME
);
nullValues = metadata.getBitmapSerdeFactory().getObjectStrategy().fromByteBufferWithSize(nullIndexBuffer);
} else {
@ -335,12 +337,12 @@ public class NestedDataColumnSupplierV4 implements Supplier<ComplexColumn>
private static ByteBuffer loadInternalFile(
SmooshedFileMapper fileMapper,
NestedDataColumnMetadata metadata,
ComplexColumnMetadata metadata,
String internalFileName
) throws IOException
{
return fileMapper.mapFile(
NestedCommonFormatColumnSerializer.getInternalFileName(metadata.getFileNameBase(), internalFileName)
ColumnSerializerUtils.getInternalFileName(metadata.getFileNameBase(), internalFileName)
);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSupplier;
import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -81,7 +82,7 @@ public final class NestedDataColumnV4<TStringDictionary extends Indexed<ByteBuff
@Override
public String getFieldFileName(String fileNameBase, String field, int fieldIndex)
{
return NestedCommonFormatColumnSerializer.getInternalFileName(
return ColumnSerializerUtils.getInternalFileName(
fileNameBase,
NestedCommonFormatColumnSerializer.NESTED_FIELD_PREFIX + fieldIndex
);

View File

@ -30,6 +30,7 @@ import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.FrontCodedIntArrayIndexed;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -92,7 +93,7 @@ public class NestedDataColumnV5<TStringDictionary extends Indexed<ByteBuffer>>
@Override
public String getFieldFileName(String fileNameBase, String field, int fieldIndex)
{
return NestedCommonFormatColumnSerializer.getInternalFileName(
return ColumnSerializerUtils.getInternalFileName(
fileNameBase,
NestedCommonFormatColumnSerializer.NESTED_FIELD_PREFIX + fieldIndex
);

View File

@ -20,13 +20,8 @@
package org.apache.druid.segment.nested;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.segment.DimensionHandler;
@ -41,6 +36,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
@ -52,20 +48,8 @@ public class NestedDataComplexTypeSerde extends ComplexMetricSerde
{
public static final String TYPE_NAME = "json";
public static final ObjectMapper OBJECT_MAPPER;
public static final NestedDataComplexTypeSerde INSTANCE = new NestedDataComplexTypeSerde();
static {
final SmileFactory smileFactory = new SmileFactory();
smileFactory.configure(SmileGenerator.Feature.ENCODE_BINARY_AS_7BIT, false);
smileFactory.delegateToTextual(true);
final ObjectMapper mapper = new DefaultObjectMapper(smileFactory, null);
mapper.getFactory().setCodec(mapper);
mapper.registerModules(BuiltInTypesModule.getJacksonModulesList());
OBJECT_MAPPER = mapper;
}
@Override
public String getTypeName()
{
@ -95,7 +79,7 @@ public class NestedDataComplexTypeSerde extends ComplexMetricSerde
buffer,
builder,
columnConfig,
OBJECT_MAPPER
ColumnSerializerUtils.SMILE_MAPPER
);
final ColumnCapabilitiesImpl capabilitiesBuilder = builder.getCapabilitiesBuilder();
capabilitiesBuilder.setDictionaryEncoded(true);
@ -136,7 +120,7 @@ public class NestedDataComplexTypeSerde extends ComplexMetricSerde
final byte[] bytes = new byte[numBytes];
buffer.get(bytes, 0, numBytes);
try {
return OBJECT_MAPPER.readValue(bytes, StructuredData.class);
return ColumnSerializerUtils.SMILE_MAPPER.readValue(bytes, StructuredData.class);
}
catch (IOException e) {
throw new ISE(e, "Unable to deserialize value");
@ -151,12 +135,18 @@ public class NestedDataComplexTypeSerde extends ComplexMetricSerde
return new byte[0];
}
try {
return OBJECT_MAPPER.writeValueAsBytes(val);
return ColumnSerializerUtils.SMILE_MAPPER.writeValueAsBytes(val);
}
catch (JsonProcessingException e) {
throw new ISE(e, "Unable to serialize value [%s]", val);
}
}
@Override
public boolean readRetainsBufferReference()
{
return false;
}
};
}

View File

@ -70,6 +70,7 @@ import org.apache.druid.segment.index.semantic.NumericRangeIndexes;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.index.semantic.ValueSetIndexes;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
import javax.annotation.Nonnull;
@ -105,12 +106,12 @@ public class ScalarDoubleColumnAndIndexSupplier implements Supplier<NestedCommon
final ByteBuffer doubleDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
);
final ByteBuffer doublesValueColumn = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.DOUBLE_VALUE_COLUMN_FILE_NAME
ColumnSerializerUtils.DOUBLE_VALUE_COLUMN_FILE_NAME
);
final Supplier<FixedIndexed<Double>> doubleDictionarySupplier = FixedIndexed.read(
@ -127,7 +128,7 @@ public class ScalarDoubleColumnAndIndexSupplier implements Supplier<NestedCommon
final ByteBuffer valueIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.BITMAP_INDEX_FILE_NAME
ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME
);
GenericIndexed<ImmutableBitmap> rBitmaps = GenericIndexed.read(
valueIndexBuffer,

View File

@ -31,6 +31,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.ColumnarDoublesSerializer;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.FixedIndexedWriter;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
@ -129,16 +130,16 @@ public class ScalarDoubleColumnSerializer extends ScalarNestedCommonFormatColumn
@Override
protected void writeValueColumn(FileSmoosher smoosher) throws IOException
{
writeInternal(smoosher, doublesSerializer, DOUBLE_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, doublesSerializer, ColumnSerializerUtils.DOUBLE_VALUE_COLUMN_FILE_NAME);
}
@Override
protected void writeDictionaryFile(FileSmoosher smoosher) throws IOException
{
if (dictionaryIdLookup.getDoubleBuffer() != null) {
writeInternal(smoosher, dictionaryIdLookup.getDoubleBuffer(), DOUBLE_DICTIONARY_FILE_NAME);
writeInternal(smoosher, dictionaryIdLookup.getDoubleBuffer(), ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
} else {
writeInternal(smoosher, dictionaryWriter, DOUBLE_DICTIONARY_FILE_NAME);
writeInternal(smoosher, dictionaryWriter, ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
}
}
}

View File

@ -69,6 +69,7 @@ import org.apache.druid.segment.index.semantic.NumericRangeIndexes;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.index.semantic.ValueSetIndexes;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
import javax.annotation.Nonnull;
@ -104,17 +105,17 @@ public class ScalarLongColumnAndIndexSupplier implements Supplier<NestedCommonFo
final ByteBuffer longDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
);
final ByteBuffer longsValueColumn = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.LONG_VALUE_COLUMN_FILE_NAME
ColumnSerializerUtils.LONG_VALUE_COLUMN_FILE_NAME
);
final ByteBuffer valueIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.BITMAP_INDEX_FILE_NAME
ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME
);
GenericIndexed<ImmutableBitmap> rBitmaps = GenericIndexed.read(
valueIndexBuffer,

View File

@ -31,6 +31,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.ColumnarLongsSerializer;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.FixedIndexedWriter;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
@ -129,16 +130,16 @@ public class ScalarLongColumnSerializer extends ScalarNestedCommonFormatColumnSe
@Override
protected void writeValueColumn(FileSmoosher smoosher) throws IOException
{
writeInternal(smoosher, longsSerializer, LONG_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, longsSerializer, ColumnSerializerUtils.LONG_VALUE_COLUMN_FILE_NAME);
}
@Override
protected void writeDictionaryFile(FileSmoosher smoosher) throws IOException
{
if (dictionaryIdLookup.getLongBuffer() != null) {
writeInternal(smoosher, dictionaryIdLookup.getLongBuffer(), LONG_DICTIONARY_FILE_NAME);
writeInternal(smoosher, dictionaryIdLookup.getLongBuffer(), ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
} else {
writeInternal(smoosher, dictionaryWriter, LONG_DICTIONARY_FILE_NAME);
writeInternal(smoosher, dictionaryWriter, ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
}
}
}

View File

@ -36,6 +36,7 @@ import org.apache.druid.segment.data.DictionaryWriter;
import org.apache.druid.segment.data.FixedIndexedIntWriter;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
@ -220,9 +221,9 @@ public abstract class ScalarNestedCommonFormatColumnSerializer<T> extends Nested
writeV0Header(channel, columnNameBytes);
writeDictionaryFile(smoosher);
writeInternal(smoosher, encodedValueSerializer, ENCODED_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, encodedValueSerializer, ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME);
writeValueColumn(smoosher);
writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
writeInternal(smoosher, bitmapIndexWriter, ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME);
log.info("Column [%s] serialized successfully.", name);
}

View File

@ -34,6 +34,7 @@ import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.VByte;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
import org.apache.druid.segment.serde.StringUtf8ColumnIndexSupplier;
@ -63,7 +64,7 @@ public class ScalarStringColumnAndIndexSupplier implements Supplier<NestedCommon
final ByteBuffer stringDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME
);
dictionarySupplier = StringEncodingStrategies.getStringDictionarySupplier(
@ -74,7 +75,7 @@ public class ScalarStringColumnAndIndexSupplier implements Supplier<NestedCommon
final ByteBuffer encodedValueColumn = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.ENCODED_VALUE_COLUMN_FILE_NAME
ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME
);
final CompressedVSizeColumnarIntsSupplier ints = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
encodedValueColumn,
@ -83,7 +84,7 @@ public class ScalarStringColumnAndIndexSupplier implements Supplier<NestedCommon
final ByteBuffer valueIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.BITMAP_INDEX_FILE_NAME
ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME
);
GenericIndexed<ImmutableBitmap> valueIndexes = GenericIndexed.read(
valueIndexBuffer,

View File

@ -30,6 +30,7 @@ import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.StringEncodingStrategies;
import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
@ -127,7 +128,7 @@ public class ScalarStringColumnSerializer extends ScalarNestedCommonFormatColumn
smoosher.add(name, fileMapper.mapFile(name));
}
} else {
writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME);
writeInternal(smoosher, dictionaryWriter, ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
}
}
}

View File

@ -26,6 +26,7 @@ import net.jpountz.xxhash.XXHash64;
import net.jpountz.xxhash.XXHashFactory;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.segment.column.TypeStrategies;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import javax.annotation.Nullable;
import java.util.Arrays;
@ -45,7 +46,7 @@ public class StructuredData implements Comparable<StructuredData>
private static long computeHash(StructuredData data)
{
try {
final byte[] bytes = NestedDataComplexTypeSerde.OBJECT_MAPPER.writeValueAsBytes(data.value);
final byte[] bytes = ColumnSerializerUtils.SMILE_MAPPER.writeValueAsBytes(data.value);
return HASH_FUNCTION.hash(bytes, 0, bytes.length, SEED);
}
catch (JsonProcessingException e) {

View File

@ -52,6 +52,7 @@ import org.apache.druid.segment.index.SimpleImmutableBitmapIndex;
import org.apache.druid.segment.index.semantic.ArrayElementIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
import javax.annotation.Nonnull;
@ -97,7 +98,7 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
final ByteBuffer stringDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME
);
stringDictionarySupplier = StringEncodingStrategies.getStringDictionarySupplier(
@ -108,7 +109,7 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
final ByteBuffer encodedValueColumn = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.ENCODED_VALUE_COLUMN_FILE_NAME
ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME
);
final CompressedVSizeColumnarIntsSupplier ints = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
encodedValueColumn,
@ -117,22 +118,22 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
final ByteBuffer longDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
);
final ByteBuffer doubleDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
);
final ByteBuffer arrayElementDictionaryBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.ARRAY_ELEMENT_DICTIONARY_FILE_NAME
ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME
);
final ByteBuffer valueIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.BITMAP_INDEX_FILE_NAME
ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME
);
final GenericIndexed<ImmutableBitmap> valueIndexes = GenericIndexed.read(
valueIndexBuffer,
@ -142,7 +143,7 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
final ByteBuffer elementIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME
ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME
);
final GenericIndexed<ImmutableBitmap> arrayElementIndexes = GenericIndexed.read(
elementIndexBuffer,
@ -166,7 +167,7 @@ public class VariantColumnAndIndexSupplier implements Supplier<NestedCommonForma
final ByteBuffer arrayDictionarybuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
);
arrayDictionarySupplier = FrontCodedIntArrayIndexed.read(
arrayDictionarybuffer,

View File

@ -48,6 +48,7 @@ import org.apache.druid.segment.data.FixedIndexedWriter;
import org.apache.druid.segment.data.FrontCodedIntArrayIndexedWriter;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
@ -413,28 +414,28 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
smoosher.add(internalName, fileMapper.mapFile(internalName));
}
} else {
writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME);
writeInternal(smoosher, dictionaryWriter, ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
}
if (dictionaryIdLookup.getLongBuffer() != null) {
writeInternal(smoosher, dictionaryIdLookup.getLongBuffer(), LONG_DICTIONARY_FILE_NAME);
writeInternal(smoosher, dictionaryIdLookup.getLongBuffer(), ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
} else {
writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME);
writeInternal(smoosher, longDictionaryWriter, ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
}
if (dictionaryIdLookup.getDoubleBuffer() != null) {
writeInternal(smoosher, dictionaryIdLookup.getDoubleBuffer(), DOUBLE_DICTIONARY_FILE_NAME);
writeInternal(smoosher, dictionaryIdLookup.getDoubleBuffer(), ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
} else {
writeInternal(smoosher, doubleDictionaryWriter, DOUBLE_DICTIONARY_FILE_NAME);
writeInternal(smoosher, doubleDictionaryWriter, ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
}
if (dictionaryIdLookup.getArrayBuffer() != null) {
writeInternal(smoosher, dictionaryIdLookup.getArrayBuffer(), ARRAY_DICTIONARY_FILE_NAME);
writeInternal(smoosher, dictionaryIdLookup.getArrayBuffer(), ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
} else {
writeInternal(smoosher, arrayDictionaryWriter, ARRAY_DICTIONARY_FILE_NAME);
writeInternal(smoosher, arrayDictionaryWriter, ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
}
writeInternal(smoosher, arrayElementDictionaryWriter, ARRAY_ELEMENT_DICTIONARY_FILE_NAME);
writeInternal(smoosher, encodedValueSerializer, ENCODED_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
writeInternal(smoosher, arrayElementIndexWriter, ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME);
writeInternal(smoosher, arrayElementDictionaryWriter, ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME);
writeInternal(smoosher, encodedValueSerializer, ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, bitmapIndexWriter, ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME);
writeInternal(smoosher, arrayElementIndexWriter, ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME);
log.info("Column [%s] serialized successfully.", name);
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.serde;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
public class ColumnSerializerUtils
{
public static final String STRING_DICTIONARY_FILE_NAME = "__stringDictionary";
public static final String LONG_DICTIONARY_FILE_NAME = "__longDictionary";
public static final String DOUBLE_DICTIONARY_FILE_NAME = "__doubleDictionary";
public static final String ARRAY_DICTIONARY_FILE_NAME = "__arrayDictionary";
public static final String ARRAY_ELEMENT_DICTIONARY_FILE_NAME = "__arrayElementDictionary";
public static final String ENCODED_VALUE_COLUMN_FILE_NAME = "__encodedColumn";
public static final String LONG_VALUE_COLUMN_FILE_NAME = "__longColumn";
public static final String DOUBLE_VALUE_COLUMN_FILE_NAME = "__doubleColumn";
public static final String BITMAP_INDEX_FILE_NAME = "__valueIndexes";
public static final String ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME = "__arrayElementIndexes";
public static final String NULL_BITMAP_FILE_NAME = "__nullIndex";
public static final ObjectMapper SMILE_MAPPER;
static {
final SmileFactory smileFactory = new SmileFactory();
smileFactory.configure(SmileGenerator.Feature.ENCODE_BINARY_AS_7BIT, false);
smileFactory.delegateToTextual(true);
final ObjectMapper mapper = new DefaultObjectMapper(smileFactory, null);
mapper.getFactory().setCodec(mapper);
mapper.registerModules(BuiltInTypesModule.getJacksonModulesList());
SMILE_MAPPER = mapper;
}
public static void writeInternal(FileSmoosher smoosher, Serializer serializer, String columnName, String fileName)
throws IOException
{
final String internalName = getInternalFileName(columnName, fileName);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, serializer.getSerializedSize())) {
serializer.writeTo(smooshChannel, smoosher);
}
}
public static void writeInternal(FileSmoosher smoosher, ByteBuffer buffer, String columnName, String fileName)
throws IOException
{
final String internalName = getInternalFileName(columnName, fileName);
try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, buffer.capacity())) {
smooshChannel.write(buffer);
}
}
public static String getInternalFileName(String fileNameBase, String field)
{
return fileNameBase + "." + field;
}
}

View File

@ -18,7 +18,7 @@
*/
package org.apache.druid.segment.nested;
package org.apache.druid.segment.serde;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -26,7 +26,7 @@ import org.apache.druid.segment.data.BitmapSerdeFactory;
import java.nio.ByteOrder;
public class NestedDataColumnMetadata
public class ComplexColumnMetadata
{
private final ByteOrder byteOrder;
private final BitmapSerdeFactory bitmapSerdeFactory;
@ -34,7 +34,7 @@ public class NestedDataColumnMetadata
private final Boolean hasNulls;
@JsonCreator
public NestedDataColumnMetadata(
public ComplexColumnMetadata(
@JsonProperty("byteOrder") ByteOrder byteOrder,
@JsonProperty("bitmapSerdeFactory") BitmapSerdeFactory bitmapSerdeFactory,
@JsonProperty("fileNameBase") String fileNameBase,

View File

@ -23,11 +23,14 @@ import com.google.common.base.Function;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
@ -43,31 +46,6 @@ public abstract class ComplexMetricSerde
public abstract ComplexMetricExtractor getExtractor();
/**
* Deserializes a ByteBuffer and adds it to the ColumnBuilder. This method allows for the ComplexMetricSerde
* to implement it's own versioning scheme to allow for changes of binary format in a forward-compatible manner.
*
* @param buffer the buffer to deserialize
* @param builder ColumnBuilder to add the column to
* @param columnConfig ColumnConfiguration used during deserialization
*/
public void deserializeColumn(
ByteBuffer buffer,
ColumnBuilder builder,
@SuppressWarnings("unused") ColumnConfig columnConfig
)
{
deserializeColumn(buffer, builder);
}
/**
* {@link ComplexMetricSerde#deserializeColumn(ByteBuffer, ColumnBuilder, ColumnConfig)} should be used instead of this.
* This method is left for backward compatibility.
*/
@Deprecated
public abstract void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder);
/**
* This is deprecated because its usage is going to be removed from the code.
* <p>
@ -142,14 +120,100 @@ public abstract class ComplexMetricSerde
}
/**
* This method provides the ability for a ComplexMetricSerde to control its own serialization.
* For large column (i.e columns greater than {@link Integer#MAX_VALUE}) use
* {@link LargeColumnSupportedComplexColumnSerializer}
* Deserializes a ByteBuffer and adds it to the ColumnBuilder. This method allows for the ComplexMetricSerde
* to implement it's own versioning scheme to allow for changes of binary format in a forward-compatible manner.
*
* @return an instance of GenericColumnSerializer used for serialization.
* @param buffer the buffer to deserialize
* @param builder ColumnBuilder to add the column to
* @param columnConfig ColumnConfiguration used during deserialization
*/
public void deserializeColumn(
ByteBuffer buffer,
ColumnBuilder builder,
ColumnConfig columnConfig
)
{
deserializeColumn(buffer, builder);
}
/**
* {@link ComplexMetricSerde#deserializeColumn(ByteBuffer, ColumnBuilder, ColumnConfig)} should be used instead of this.
* This method is left for backward compatibility.
*/
@Deprecated
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
// default implementation to match default serializer implementation
final int position = buffer.position();
final byte version = buffer.get();
if (version == CompressedComplexColumnSerializer.IS_COMPRESSED) {
CompressedComplexColumnSupplier supplier = CompressedComplexColumnSupplier.read(
buffer,
builder,
getTypeName(),
getObjectStrategy()
);
builder.setComplexColumnSupplier(supplier);
builder.setNullValueIndexSupplier(supplier.getNullValues());
builder.setHasNulls(!supplier.getNullValues().isEmpty());
} else {
buffer.position(position);
builder.setComplexColumnSupplier(
new ComplexColumnPartSupplier(
getTypeName(),
GenericIndexed.read(buffer, getObjectStrategy(), builder.getFileMapper())
)
);
}
}
/**
* {@link ComplexMetricSerde#getSerializer(SegmentWriteOutMedium, String, IndexSpec)} should be used instead of this.
* This method is left for backward compatibility.
*/
@Nullable
@Deprecated
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
return ComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
return null;
}
/**
* This method provides the ability for a ComplexMetricSerde to control its own serialization.
* Default implementation uses {@link CompressedComplexColumnSerializer} if {@link IndexSpec#complexMetricCompression}
* is not null or uncompressed/none, or {@link LargeColumnSupportedComplexColumnSerializer} if no compression is
* specified.
*
* @return an instance of {@link GenericColumnSerializer} used for serialization.
*/
public GenericColumnSerializer getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column,
IndexSpec indexSpec
)
{
// backwards compatibility, if defined use it
final GenericColumnSerializer serializer = getSerializer(segmentWriteOutMedium, column);
if (serializer != null) {
return serializer;
}
// otherwise, use compressed or generic indexed based serializer
CompressionStrategy strategy = indexSpec.getComplexMetricCompression();
if (strategy == null || CompressionStrategy.NONE == strategy || CompressionStrategy.UNCOMPRESSED == strategy) {
return LargeColumnSupportedComplexColumnSerializer.create(
segmentWriteOutMedium,
column,
getObjectStrategy()
);
} else {
return CompressedComplexColumnSerializer.create(
segmentWriteOutMedium,
column,
indexSpec,
getObjectStrategy()
);
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.serde;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.data.CompressedVariableSizedBlobColumn;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public final class CompressedComplexColumn implements ComplexColumn
{
private final String typeName;
private final CompressedVariableSizedBlobColumn compressedColumn;
private final ImmutableBitmap nullValues;
private final ObjectStrategy<?> objectStrategy;
public CompressedComplexColumn(
String typeName,
CompressedVariableSizedBlobColumn compressedColumn,
ImmutableBitmap nullValues,
ObjectStrategy<?> objectStrategy
)
{
this.typeName = typeName;
this.compressedColumn = compressedColumn;
this.nullValues = nullValues;
this.objectStrategy = objectStrategy;
}
@Override
public Class<?> getClazz()
{
return objectStrategy.getClazz();
}
@Override
public String getTypeName()
{
return typeName;
}
@Override
@Nullable
public Object getRowValue(int rowNum)
{
if (nullValues.get(rowNum)) {
return null;
}
final ByteBuffer valueBuffer = compressedColumn.get(rowNum);
return objectStrategy.fromByteBuffer(valueBuffer, valueBuffer.remaining());
}
@Override
public int getLength()
{
return compressedColumn.size();
}
@Override
public void close()
{
CloseableUtils.closeAndWrapExceptions(compressedColumn);
}
}

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.serde;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.ByteBufferWriter;
import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSerializer;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
public class CompressedComplexColumnSerializer<T> implements GenericColumnSerializer<T>
{
public static final byte IS_COMPRESSED = Byte.MAX_VALUE;
public static final byte V0 = 0x00;
public static final String FILE_NAME = "__complexColumn";
public static GenericColumnSerializer create(
SegmentWriteOutMedium segmentWriteOutMedium,
String column,
IndexSpec indexSpec,
ObjectStrategy objectStrategy
)
{
return new CompressedComplexColumnSerializer(column, segmentWriteOutMedium, indexSpec, objectStrategy);
}
public CompressedComplexColumnSerializer(
String name,
SegmentWriteOutMedium segmentWriteOutMedium,
IndexSpec indexSpec,
ObjectStrategy<T> strategy
)
{
this.name = name;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.indexSpec = indexSpec;
this.strategy = strategy;
}
private final String name;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final IndexSpec indexSpec;
private final ObjectStrategy<T> strategy;
private CompressedVariableSizedBlobColumnSerializer writer;
private ByteBufferWriter<ImmutableBitmap> nullBitmapWriter;
private MutableBitmap nullRowsBitmap;
private int rowCount = 0;
private boolean closedForWrite = false;
private byte[] metadataBytes;
@Override
public void open() throws IOException
{
writer = new CompressedVariableSizedBlobColumnSerializer(
ColumnSerializerUtils.getInternalFileName(name, FILE_NAME),
segmentWriteOutMedium,
indexSpec.getComplexMetricCompression()
);
writer.open();
nullBitmapWriter = new ByteBufferWriter<>(
segmentWriteOutMedium,
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
);
nullBitmapWriter.open();
nullRowsBitmap = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
}
@Override
public void serialize(ColumnValueSelector<? extends T> selector) throws IOException
{
final T data = selector.getObject();
if (data == null) {
nullRowsBitmap.add(rowCount);
}
rowCount++;
final byte[] bytes = strategy.toBytes(data);
writer.addValue(bytes);
}
@Override
public long getSerializedSize() throws IOException
{
closeForWrite();
// COMPRESSED_BYTE + V0 + metadata
return 1 + 1 + metadataBytes.length;
}
@Override
public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
Preconditions.checkState(closedForWrite, "Not closed yet!");
channel.write(ByteBuffer.wrap(new byte[]{IS_COMPRESSED}));
channel.write(ByteBuffer.wrap(new byte[]{V0}));
channel.write(ByteBuffer.wrap(metadataBytes));
ColumnSerializerUtils.writeInternal(smoosher, writer, name, FILE_NAME);
if (!nullRowsBitmap.isEmpty()) {
ColumnSerializerUtils.writeInternal(
smoosher,
nullBitmapWriter,
name,
ColumnSerializerUtils.NULL_BITMAP_FILE_NAME
);
}
}
private void closeForWrite() throws IOException
{
if (!closedForWrite) {
closedForWrite = true;
nullBitmapWriter.write(nullRowsBitmap);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IndexMerger.SERIALIZER_UTILS.writeString(
baos,
ColumnSerializerUtils.SMILE_MAPPER.writeValueAsString(
new ComplexColumnMetadata(
ByteOrder.nativeOrder(),
indexSpec.getBitmapSerdeFactory(),
name,
!nullRowsBitmap.isEmpty()
)
)
);
this.metadataBytes = baos.toByteArray();
}
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.serde;
import com.google.common.base.Supplier;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSupplier;
import org.apache.druid.segment.data.ObjectStrategy;
import java.io.IOException;
import java.nio.ByteBuffer;
public class CompressedComplexColumnSupplier implements Supplier<ComplexColumn>
{
public static CompressedComplexColumnSupplier read(
ByteBuffer bb,
ColumnBuilder columnBuilder,
String typeName,
ObjectStrategy objectStrategy
)
{
final byte version = bb.get();
if (version == CompressedComplexColumnSerializer.V0) {
try {
final ComplexColumnMetadata metadata = ColumnSerializerUtils.SMILE_MAPPER.readValue(
IndexMerger.SERIALIZER_UTILS.readString(bb),
ComplexColumnMetadata.class
);
final SmooshedFileMapper mapper = columnBuilder.getFileMapper();
final CompressedVariableSizedBlobColumnSupplier compressedColumnSupplier;
final ImmutableBitmap nullValues;
final ByteBuffer fileBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
metadata.getFileNameBase(),
CompressedComplexColumnSerializer.FILE_NAME
);
compressedColumnSupplier = CompressedVariableSizedBlobColumnSupplier.fromByteBuffer(
ColumnSerializerUtils.getInternalFileName(
metadata.getFileNameBase(),
CompressedComplexColumnSerializer.FILE_NAME
),
fileBuffer,
metadata.getByteOrder(),
objectStrategy.readRetainsBufferReference(),
mapper
);
if (metadata.hasNulls()) {
columnBuilder.setHasNulls(true);
final ByteBuffer nullIndexBuffer = NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
metadata.getFileNameBase(),
ColumnSerializerUtils.NULL_BITMAP_FILE_NAME
);
nullValues = metadata.getBitmapSerdeFactory().getObjectStrategy().fromByteBufferWithSize(nullIndexBuffer);
} else {
nullValues = metadata.getBitmapSerdeFactory().getBitmapFactory().makeEmptyImmutableBitmap();
}
return new CompressedComplexColumnSupplier(typeName, objectStrategy, compressedColumnSupplier, nullValues);
}
catch (IOException ex) {
throw new RE(ex, "Failed to deserialize V%s column.", version);
}
}
throw new RE("Unknown version " + version);
}
private final String typeName;
private final ObjectStrategy objectStrategy;
private final CompressedVariableSizedBlobColumnSupplier compressedColumnSupplier;
private final ImmutableBitmap nullValues;
private CompressedComplexColumnSupplier(
String typeName,
ObjectStrategy objectStrategy,
CompressedVariableSizedBlobColumnSupplier compressedColumnSupplier,
ImmutableBitmap nullValues
)
{
this.typeName = typeName;
this.objectStrategy = objectStrategy;
this.compressedColumnSupplier = compressedColumnSupplier;
this.nullValues = nullValues;
}
@Override
public ComplexColumn get()
{
return new CompressedComplexColumn(typeName, compressedColumnSupplier.get(), nullValues, objectStrategy);
}
public ImmutableBitmap getNullValues()
{
return nullValues;
}
}

View File

@ -32,7 +32,6 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.nested.NestedCommonFormatColumn;
import org.apache.druid.segment.nested.NestedCommonFormatColumnSerializer;
import org.apache.druid.segment.nested.NestedDataColumnSupplier;
import org.apache.druid.segment.nested.ScalarDoubleColumnAndIndexSupplier;
import org.apache.druid.segment.nested.ScalarLongColumnAndIndexSupplier;
@ -70,7 +69,7 @@ public class NestedCommonFormatColumnPartSerde implements ColumnPartSerde
) throws IOException
{
return fileMapper.mapFile(
NestedCommonFormatColumnSerializer.getInternalFileName(fileNameBase, internalFileName)
ColumnSerializerUtils.getInternalFileName(fileNameBase, internalFileName)
);
}

View File

@ -445,6 +445,10 @@ public class QueryRunnerTestHelper
maker.apply(
"frontCodedMMappedTestIndex",
new QueryableIndexSegment(TestIndex.getFrontCodedMMappedTestIndex(), SEGMENT_ID)
),
maker.apply(
"mMappedTestIndexCompressedComplex",
new QueryableIndexSegment(TestIndex.getMMappedTestIndexCompressedComplex(), SEGMENT_ID)
)
)
);

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ComplexColumn;
@ -147,7 +148,7 @@ public class SerializablePairLongDoubleComplexMetricSerdeTest
SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
ByteBuffer compressedBuffer = serializeAllValuesToByteBuffer(
expected,
COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used"),
COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used", IndexSpec.DEFAULT),
expectedCompressedSize
).asReadOnlyBuffer();

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ComplexColumn;
@ -147,7 +148,7 @@ public class SerializablePairLongFloatComplexMetricSerdeTest
SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
ByteBuffer compressedBuffer = serializeAllValuesToByteBuffer(
expected,
COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used"),
COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used", IndexSpec.DEFAULT),
expectedCompressedSize
).asReadOnlyBuffer();

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ComplexColumn;
@ -147,7 +148,7 @@ public class SerializablePairLongLongComplexMetricSerdeTest
SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
ByteBuffer compressedBuffer = serializeAllValuesToByteBuffer(
expected,
COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used"),
COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used", IndexSpec.DEFAULT),
expectedCompressedSize
).asReadOnlyBuffer();

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ComplexColumn;
@ -152,12 +153,12 @@ public class SerializablePairLongStringComplexMetricSerdeTest
SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
ByteBuffer legacyBuffer = serializeAllValuesToByteBuffer(
expected,
LEGACY_SERDE.getSerializer(writeOutMedium, "not-used"),
LEGACY_SERDE.getSerializer(writeOutMedium, "not-used", IndexSpec.DEFAULT),
expectedLegacySize
).asReadOnlyBuffer();
ByteBuffer compressedBuffer = serializeAllValuesToByteBuffer(
expected,
COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used"),
COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used", IndexSpec.DEFAULT),
expectedCompressedSize
).asReadOnlyBuffer();

View File

@ -51,6 +51,7 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.StringEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.FrontCodedIndexed;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@ -195,6 +196,13 @@ public class TestIndex
private static Supplier<QueryableIndex> mmappedIndex = Suppliers.memoize(
() -> persistRealtimeAndLoadMMapped(realtimeIndex.get())
);
private static Supplier<QueryableIndex> mmappedIndexCompressedComplex = Suppliers.memoize(
() -> persistRealtimeAndLoadMMapped(
realtimeIndex.get(),
IndexSpec.builder().withComplexMetricCompression(CompressionStrategy.LZ4).build()
)
);
private static Supplier<QueryableIndex> nonTimeOrderedMmappedIndex = Suppliers.memoize(
() -> persistRealtimeAndLoadMMapped(nonTimeOrderedRealtimeIndex.get())
);
@ -324,6 +332,11 @@ public class TestIndex
return frontCodedMmappedIndex.get();
}
public static QueryableIndex getMMappedTestIndexCompressedComplex()
{
return mmappedIndexCompressedComplex.get();
}
public static IncrementalIndex makeRealtimeIndex(final String resourceFilename)
{
return makeRealtimeIndex(resourceFilename, true);

View File

@ -92,7 +92,7 @@ public class CompressedVariableSizeBlobColumnTest
).get();
for (int row = 0; row < numWritten; row++) {
ByteBuffer value = column.get(row);
byte[] bytes = new byte[value.limit()];
byte[] bytes = new byte[value.remaining()];
value.get(bytes);
Assert.assertArrayEquals("Row " + row, values.get(row), bytes);
}

View File

@ -61,6 +61,7 @@ import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.serde.ColumnPartSerde;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.ComplexColumnPartSerde;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
@ -247,7 +248,7 @@ public class NestedDataColumnSupplierV4Test extends InitializedNullHandlingTest
baseBuffer,
bob,
ColumnConfig.SELECTION_SIZE,
NestedDataComplexTypeSerde.OBJECT_MAPPER
ColumnSerializerUtils.SMILE_MAPPER
);
final String expectedReason = "none";
final AtomicReference<String> failureReason = new AtomicReference<>(expectedReason);

View File

@ -24,11 +24,9 @@ import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.nio.ByteBuffer;
import java.util.Comparator;
@ -141,15 +139,4 @@ public class HyperUniquesSerdeForTest extends ComplexMetricSerde
}
};
}
@Override
public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String metric)
{
return LargeColumnSupportedComplexColumnSerializer.createWithColumnSize(
segmentWriteOutMedium,
metric,
this.getObjectStrategy(),
Integer.MAX_VALUE
);
}
}

View File

@ -299,6 +299,7 @@ colocation
colocating
compactable
compactionTask
complexMetricCompression
config
configs
consumerProperties