Persist legacy LatestPairs for now (#13378)

We added compression to the latest/first pair storage, but
the code change was forcing new things to be persisted
with the new format, meaning that any segment created with
the new code cannot be read by the old code.  Instead, we
need to default to creating the old format and then remove that default in a future version.
This commit is contained in:
imply-cheddar 2022-11-18 01:07:02 +09:00 committed by GitHub
parent 8e9e46b519
commit 6b9344cd39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 219 additions and 116 deletions

View File

@ -30,7 +30,6 @@ import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.ReadableOffset;
import javax.annotation.Nullable;
import java.io.IOException;
/**
@ -40,6 +39,7 @@ public class CompressedBigDecimalColumn implements ComplexColumn
{
public static final Logger LOGGER = new Logger(CompressedBigDecimalColumn.class);
private final int length;
private final ColumnarInts scale;
private final ColumnarMultiInts magnitude;
@ -49,8 +49,9 @@ public class CompressedBigDecimalColumn implements ComplexColumn
* @param scale scale of the rows
* @param magnitude LongColumn representing magnitudes
*/
public CompressedBigDecimalColumn(ColumnarInts scale, ColumnarMultiInts magnitude)
public CompressedBigDecimalColumn(int length, ColumnarInts scale, ColumnarMultiInts magnitude)
{
this.length = length;
this.scale = scale;
this.magnitude = magnitude;
}
@ -87,7 +88,7 @@ public class CompressedBigDecimalColumn implements ComplexColumn
@Override
public int getLength()
{
return scale.size();
return length;
}
@Override

View File

@ -26,6 +26,7 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSupplier;
import java.nio.ByteBuffer;
/**
@ -33,27 +34,8 @@ import java.nio.ByteBuffer;
*/
public class CompressedBigDecimalColumnPartSupplier implements Supplier<ComplexColumn>
{
public static final int VERSION = 0x1;
private final CompressedVSizeColumnarIntsSupplier scaleSupplier;
private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier;
/**
* Constructor.
*
* @param scaleSupplier scale supplier
* @param magnitudeSupplier supplied of results
*/
public CompressedBigDecimalColumnPartSupplier(
CompressedVSizeColumnarIntsSupplier scaleSupplier,
V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier
)
{
this.scaleSupplier = scaleSupplier;
this.magnitudeSupplier = magnitudeSupplier;
}
/**
* Compressed.
*
@ -67,23 +49,50 @@ public class CompressedBigDecimalColumnPartSupplier implements Supplier<ComplexC
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == VERSION) {
int positionStart = buffer.position();
CompressedVSizeColumnarIntsSupplier scaleSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
buffer,
IndexIO.BYTE_ORDER);
IndexIO.BYTE_ORDER
);
V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier =
V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, IndexIO.BYTE_ORDER);
return new CompressedBigDecimalColumnPartSupplier(scaleSupplier, magnitudeSupplier);
return new CompressedBigDecimalColumnPartSupplier(
buffer.position() - positionStart,
scaleSupplier,
magnitudeSupplier
);
} else {
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
}
private final int byteSize;
private final CompressedVSizeColumnarIntsSupplier scaleSupplier;
private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier;
/**
* Constructor.
*
* @param scaleSupplier scale supplier
* @param magnitudeSupplier supplied of results
*/
public CompressedBigDecimalColumnPartSupplier(
int byteSize,
CompressedVSizeColumnarIntsSupplier scaleSupplier,
V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier
)
{
this.byteSize = byteSize;
this.scaleSupplier = scaleSupplier;
this.magnitudeSupplier = magnitudeSupplier;
}
@Override
public ComplexColumn get()
{
return new CompressedBigDecimalColumn(scaleSupplier.get(), magnitudeSupplier.get());
return new CompressedBigDecimalColumn(byteSize, scaleSupplier.get(), magnitudeSupplier.get());
}
}

View File

@ -35,6 +35,7 @@ public class CompressedBigDecimalColumnTest
ColumnarInts columnarInts = EasyMock.createMock(ColumnarInts.class);
ReadableOffset readableOffset = EasyMock.createMock(ReadableOffset.class);
CompressedBigDecimalColumn compressedBigDecimalColumn = new CompressedBigDecimalColumn(
12345,
columnarInts,
columnarMultiInts
);
@ -42,7 +43,7 @@ public class CompressedBigDecimalColumnTest
CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL,
compressedBigDecimalColumn.getTypeName()
);
Assert.assertEquals(0, compressedBigDecimalColumn.getLength());
Assert.assertEquals(12345, compressedBigDecimalColumn.getLength());
Assert.assertEquals(CompressedBigDecimalColumn.class, compressedBigDecimalColumn.getClazz());
Assert.assertNotNull(compressedBigDecimalColumn.makeColumnValueSelector(readableOffset));
}

View File

@ -175,7 +175,7 @@ public class ComplexFrameColumnReader implements FrameColumnReader
@Override
public int getLength()
{
return frame.numRows();
return (int) frame.numBytes();
}
@Override

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.aggregation;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
@ -28,6 +29,7 @@ import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
@ -49,6 +51,20 @@ import java.util.Comparator;
*/
public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricSerde
{
/**
* This is a configuration parameter to allow for turning on compression. It is a hack, it would be significantly
* better if this could be delivered via properties. The number one reason this is a hack is because it reads
* the System.getProperty which doesn't actually have runtime.properties files put into it, so this setting
* could be set in runtime.properties and this code wouldn't see it, because that's not how it is wired up.
*
* The intent of this parameter is so that Druid 25 can be released using the legacy serialization format. This
* will allow us to get code released that can *read* both the legacy and the new format. Then, in Druid 26,
* we can completely eliminate this boolean and start to only *write* the new format, in which case this
* hack of a configuration property disappears.
*/
private static final boolean COMPRESSION_ENABLED =
Boolean.parseBoolean(System.getProperty("druid.columns.pairLongString.compressed", "false"));
public static final int EXPECTED_VERSION = 3;
public static final String TYPE_NAME = "serializablePairLongString";
// Null SerializablePairLongString values are put first
@ -60,6 +76,17 @@ public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricS
private static final SerializablePairLongStringSimpleStagedSerde SERDE =
new SerializablePairLongStringSimpleStagedSerde();
private final boolean compressionEnabled;
public SerializablePairLongStringComplexMetricSerde()
{
this(COMPRESSION_ENABLED);
}
public SerializablePairLongStringComplexMetricSerde(boolean compressionEnabled)
{
this.compressionEnabled = compressionEnabled;
}
@Override
public String getTypeName()
@ -92,7 +119,7 @@ public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricS
byte version = buffer.get(buffer.position());
if (version == 0 || version == 1 || version == 2) {
GenericIndexed<?> column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper());
GenericIndexed<?> column = GenericIndexed.read(buffer, LEGACY_STRATEGY, columnBuilder.getFileMapper());
columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
} else {
SerializablePairLongStringComplexColumn.Builder builder =
@ -141,9 +168,73 @@ public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricS
@Override
public GenericColumnSerializer<?> getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
if (compressionEnabled) {
return new SerializablePairLongStringColumnSerializer(
segmentWriteOutMedium,
NativeClearedByteBufferProvider.INSTANCE
);
} else {
return LargeColumnSupportedComplexColumnSerializer.create(
segmentWriteOutMedium,
column,
LEGACY_STRATEGY
);
}
}
private static final ObjectStrategy<SerializablePairLongString> LEGACY_STRATEGY =
new ObjectStrategy<SerializablePairLongString>()
{
@Override
public int compare(@Nullable SerializablePairLongString o1, @Nullable SerializablePairLongString o2)
{
return COMPARATOR.compare(o1, o2);
}
@Override
public Class<? extends SerializablePairLongString> getClazz()
{
return SerializablePairLongString.class;
}
@Override
public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
long lhs = readOnlyBuffer.getLong();
int stringSize = readOnlyBuffer.getInt();
String lastString = null;
if (stringSize > 0) {
byte[] stringBytes = new byte[stringSize];
readOnlyBuffer.get(stringBytes, 0, stringSize);
lastString = StringUtils.fromUtf8(stringBytes);
}
return new SerializablePairLongString(lhs, lastString);
}
@Override
public byte[] toBytes(SerializablePairLongString val)
{
String rhsString = val.rhs;
ByteBuffer bbuf;
if (rhsString != null) {
byte[] rhsBytes = StringUtils.toUtf8(rhsString);
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length);
bbuf.putLong(val.lhs);
bbuf.putInt(Long.BYTES, rhsBytes.length);
bbuf.position(Long.BYTES + Integer.BYTES);
bbuf.put(rhsBytes);
} else {
bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
bbuf.putLong(val.lhs);
bbuf.putInt(Long.BYTES, 0);
}
return bbuf.array();
}
};
}

View File

@ -96,7 +96,7 @@ public class SegmentAnalyzer
final StorageAdapter storageAdapter = segment.asStorageAdapter();
// get length and column names from storageAdapter
final int length = storageAdapter.getNumRows();
final int numRows = storageAdapter.getNumRows();
// Use LinkedHashMap to preserve column order.
final Map<String, ColumnAnalysis> columns = new LinkedHashMap<>();
@ -119,13 +119,13 @@ public class SegmentAnalyzer
final int bytesPerRow =
ColumnHolder.TIME_COLUMN_NAME.equals(columnName) ? NUM_BYTES_IN_TIMESTAMP : Long.BYTES;
analysis = analyzeNumericColumn(capabilities, length, bytesPerRow);
analysis = analyzeNumericColumn(capabilities, numRows, bytesPerRow);
break;
case FLOAT:
analysis = analyzeNumericColumn(capabilities, length, NUM_BYTES_IN_TEXT_FLOAT);
analysis = analyzeNumericColumn(capabilities, numRows, NUM_BYTES_IN_TEXT_FLOAT);
break;
case DOUBLE:
analysis = analyzeNumericColumn(capabilities, length, Double.BYTES);
analysis = analyzeNumericColumn(capabilities, numRows, Double.BYTES);
break;
case STRING:
if (index != null) {
@ -136,7 +136,7 @@ public class SegmentAnalyzer
break;
case COMPLEX:
final ColumnHolder columnHolder = index != null ? index.getColumnHolder(columnName) : null;
analysis = analyzeComplexColumn(capabilities, columnHolder);
analysis = analyzeComplexColumn(capabilities, numRows, columnHolder);
break;
default:
log.warn("Unknown column type[%s].", capabilities.asTypeString());
@ -330,6 +330,7 @@ public class SegmentAnalyzer
private ColumnAnalysis analyzeComplexColumn(
@Nullable final ColumnCapabilities capabilities,
final int numCells,
@Nullable final ColumnHolder columnHolder
)
{
@ -362,8 +363,7 @@ public class SegmentAnalyzer
);
}
final int length = complexColumn.getLength();
for (int i = 0; i < length; ++i) {
for (int i = 0; i < numCells; ++i) {
size += inputSizeFn.apply(complexColumn.getRowValue(i));
}
}

View File

@ -58,7 +58,7 @@ public interface ComplexColumn extends BaseColumn
Object getRowValue(int rowNum);
/**
* @return serialized size (in bytes) of this column.
* @return serialized size (in bytes) of this column. -1 for unknown
*/
int getLength();

View File

@ -58,7 +58,7 @@ public class GenericIndexedBasedComplexColumn implements ComplexColumn
@Override
public int getLength()
{
return index.size();
return -1;
}
@Override

View File

@ -59,7 +59,7 @@ public class UnknownTypeComplexColumn implements ComplexColumn
@Override
public int getLength()
{
return 0;
return -1;
}
@Override

View File

@ -281,7 +281,7 @@ public final class CompressedNestedDataComplexColumn<TStringDictionary extends I
@Override
public int getLength()
{
return 0;
return -1;
}
@Override

View File

@ -20,55 +20,50 @@
package org.apache.druid.query.aggregation;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.serde.cell.RandomStringUtils;
import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicReference;
public class SerializablePairLongStringComplexMetricSerdeTest
{
private static final SerializablePairLongStringComplexMetricSerde COMPLEX_METRIC_SERDE =
static {
NullHandling.initializeForTests();
}
private static final SerializablePairLongStringComplexMetricSerde LEGACY_SERDE =
new SerializablePairLongStringComplexMetricSerde();
private static final SerializablePairLongStringComplexMetricSerde COMPRESSED_SERDE =
new SerializablePairLongStringComplexMetricSerde(true);
// want deterministic test input
private final Random random = new Random(0);
private final RandomStringUtils randomStringUtils = new RandomStringUtils(random);
private GenericColumnSerializer<SerializablePairLongString> serializer;
@SuppressWarnings("unchecked")
@Before
public void setup()
{
SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
serializer = (GenericColumnSerializer<SerializablePairLongString>) COMPLEX_METRIC_SERDE.getSerializer(
writeOutMedium,
"not-used"
);
}
@Test
public void testSingle() throws Exception
{
assertExpected(ImmutableList.of(new SerializablePairLongString(100L, "fuu")), 77);
assertExpected(ImmutableList.of(new SerializablePairLongString(100L, "fuu")), 33, 77);
}
@Test
@ -78,7 +73,7 @@ public class SerializablePairLongStringComplexMetricSerdeTest
assertExpected(ImmutableList.of(new SerializablePairLongString(
100L,
randomStringUtils.randomAlphanumeric(2 * 1024 * 1024)
)), 2103140);
)), 2097182, 2103140);
}
@Test
@ -95,8 +90,7 @@ public class SerializablePairLongStringComplexMetricSerdeTest
valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, stringList.get(i % numStrings)));
}
//actual input bytes in naive encoding is ~10mb
assertExpected(valueList, 1746026);
assertExpected(valueList, 10440010, 1746026);
}
@Test
@ -109,8 +103,7 @@ public class SerializablePairLongStringComplexMetricSerdeTest
valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, stringValue));
}
//actual input bytes in naive encoding is ~10mb
assertExpected(valueList, 289645);
assertExpected(valueList, 10440010, 289645);
}
@Test
@ -122,81 +115,109 @@ public class SerializablePairLongStringComplexMetricSerdeTest
valueList.add(new SerializablePairLongString(random.nextLong(), randomStringUtils.randomAlphanumeric(1024)));
}
assertExpected(valueList, 10428975);
assertExpected(valueList, 10440010, 10428975);
}
@Test
public void testNullString() throws Exception
{
assertExpected(ImmutableList.of(new SerializablePairLongString(100L, null)), 74);
assertExpected(ImmutableList.of(new SerializablePairLongString(100L, null)), 30, 74);
}
@Test
public void testEmpty() throws Exception
{
// minimum size for empty data
assertExpected(Collections.emptyList(), 57);
assertExpected(Collections.emptyList(), 10, 57);
}
@Test
public void testSingleNull() throws Exception
{
assertExpected(Arrays.asList(new SerializablePairLongString[]{null}), 58);
assertExpected(Arrays.asList(new SerializablePairLongString[]{null}), 18, 58);
}
@Test
public void testMultipleNull() throws Exception
{
assertExpected(Arrays.asList(null, null, null, null), 59);
assertExpected(Arrays.asList(null, null, null, null), 42, 59);
}
private void assertExpected(List<SerializablePairLongString> expected) throws IOException
private ByteBuffer assertExpected(
List<SerializablePairLongString> expected,
int expectedLegacySize,
int expectedCompressedSize
) throws IOException
{
assertExpected(expected, -1);
SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
ByteBuffer legacyBuffer = serializeAllValuesToByteBuffer(
expected,
LEGACY_SERDE.getSerializer(writeOutMedium, "not-used"),
expectedLegacySize
).asReadOnlyBuffer();
ByteBuffer compressedBuffer = serializeAllValuesToByteBuffer(
expected,
COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used"),
expectedCompressedSize
).asReadOnlyBuffer();
try (ComplexColumn legacyCol = createComplexColumn(legacyBuffer);
ComplexColumn compressedCol = createComplexColumn(compressedBuffer)
) {
for (int i = 0; i < expected.size(); i++) {
Assert.assertEquals(expected.get(i), legacyCol.getRowValue(i));
Assert.assertEquals(expected.get(i), compressedCol.getRowValue(i));
}
}
return compressedBuffer;
}
private void assertExpected(List<SerializablePairLongString> expected, int expectedSize) throws IOException
{
List<SerializablePairLongStringValueSelector> valueSelectors =
expected.stream().map(SerializablePairLongStringValueSelector::new).collect(Collectors.toList());
ByteBuffer byteBuffer = serializeAllValuesToByteBuffer(valueSelectors, serializer, expectedSize);
try (SerializablePairLongStringComplexColumn complexColumn = createComplexColumn(byteBuffer)) {
for (int i = 0; i < valueSelectors.size(); i++) {
Assert.assertEquals(expected.get(i), complexColumn.getRowValue(i));
}
}
}
private SerializablePairLongStringComplexColumn createComplexColumn(ByteBuffer byteBuffer)
private ComplexColumn createComplexColumn(ByteBuffer byteBuffer)
{
ColumnBuilder builder = new ColumnBuilder();
int serializedSize = byteBuffer.remaining();
COMPLEX_METRIC_SERDE.deserializeColumn(byteBuffer, builder);
LEGACY_SERDE.deserializeColumn(byteBuffer, builder);
builder.setType(ValueType.COMPLEX);
ColumnHolder columnHolder = builder.build();
SerializablePairLongStringComplexColumn column = (SerializablePairLongStringComplexColumn) columnHolder.getColumn();
final ComplexColumn col = (ComplexColumn) columnHolder.getColumn();
if (col instanceof SerializablePairLongStringComplexColumn) {
Assert.assertEquals(serializedSize, col.getLength());
}
Assert.assertEquals("serializablePairLongString", col.getTypeName());
Assert.assertEquals(SerializablePairLongString.class, col.getClazz());
Assert.assertEquals(serializedSize, column.getLength());
Assert.assertEquals("serializablePairLongString", column.getTypeName());
Assert.assertEquals(SerializablePairLongString.class, column.getClazz());
return column;
return col;
}
@SuppressWarnings({"unchecked", "rawtypes"})
private static ByteBuffer serializeAllValuesToByteBuffer(
Collection<SerializablePairLongStringValueSelector> valueSelectors,
GenericColumnSerializer<SerializablePairLongString> serializer,
List<SerializablePairLongString> values,
GenericColumnSerializer serializer,
int expectedSize
) throws IOException
{
serializer.open();
for (SerializablePairLongStringValueSelector valueSelector : valueSelectors) {
final AtomicReference<SerializablePairLongString> reference = new AtomicReference<>(null);
ColumnValueSelector<SerializablePairLongString> valueSelector =
new SingleObjectColumnValueSelector<SerializablePairLongString>(
SerializablePairLongString.class
)
{
@Nullable
@Override
public SerializablePairLongString getObject()
{
return reference.get();
}
};
for (SerializablePairLongString selector : values) {
reference.set(selector);
serializer.serialize(valueSelector);
}
@ -225,13 +246,4 @@ public class SerializablePairLongStringComplexMetricSerdeTest
return byteBuffer;
}
private static class SerializablePairLongStringValueSelector
extends SingleValueColumnValueSelector<SerializablePairLongString>
{
public SerializablePairLongStringValueSelector(SerializablePairLongString value)
{
super(SerializablePairLongString.class, value);
}
}
}

View File

@ -22,17 +22,13 @@ package org.apache.druid.query.aggregation;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable;
public class SingleValueColumnValueSelector<T> implements ColumnValueSelector<T>
public abstract class SingleObjectColumnValueSelector<T> implements ColumnValueSelector<T>
{
private final Class<T> valueClass;
private final T value;
public SingleValueColumnValueSelector(Class<T> valueClass, T value)
public SingleObjectColumnValueSelector(Class<T> valueClass)
{
this.valueClass = valueClass;
this.value = value;
}
@Override
@ -64,13 +60,6 @@ public class SingleValueColumnValueSelector<T> implements ColumnValueSelector<T>
return false;
}
@Nullable
@Override
public T getObject()
{
return value;
}
@Override
public Class<? extends T> classOfObject()
{