diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java index 98a7ab51f98..e3699ea3db8 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java @@ -45,7 +45,7 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier CompressionStrategy strategy ) { - baseLongBuffers = GenericIndexed.read(fromBuffer, new DecompressingByteBufferObjectStrategy(order, strategy)); + baseLongBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(order, strategy)); this.totalSize = totalSize; this.sizePer = sizePer; this.baseReader = reader; diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java index e685721eb88..88e78f79808 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java @@ -125,7 +125,7 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier> { + /** + * Cache strategies in a static, because there are not very many distinct ones -- there are only so many combinations + * of byte order and decompressor that we can possibly have -- and we need one of these per GenericIndexed, which + * is a class that we tend to have tons of in heap. + */ + private static final ConcurrentHashMap, DecompressingByteBufferObjectStrategy> STRATEGIES = + new ConcurrentHashMap<>(); + private final ByteOrder order; private final CompressionStrategy.Decompressor decompressor; - DecompressingByteBufferObjectStrategy(ByteOrder order, CompressionStrategy compression) + private DecompressingByteBufferObjectStrategy(ByteOrder order, CompressionStrategy compression) { this.order = order; this.decompressor = compression.getDecompressor(); } + public static DecompressingByteBufferObjectStrategy of( + final ByteOrder order, + final CompressionStrategy compression + ) + { + return STRATEGIES.computeIfAbsent( + Pair.of(order, compression), + pair -> new DecompressingByteBufferObjectStrategy(pair.lhs, pair.rhs) + ); + } + @Override @SuppressWarnings("unchecked") public Class> getClazz() diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java index 11a8b61531b..a87dcda09b5 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java @@ -82,7 +82,7 @@ import java.util.Iterator; * * @see GenericIndexedWriter */ -public class GenericIndexed implements CloseableIndexed, Serializer +public abstract class GenericIndexed implements CloseableIndexed, Serializer { static final byte VERSION_ONE = 0x1; static final byte VERSION_TWO = 0x2; @@ -91,12 +91,6 @@ public class GenericIndexed implements CloseableIndexed, Serializer static final int NULL_VALUE_SIZE_MARKER = -1; - private static final MetaSerdeHelper META_SERDE_HELPER = MetaSerdeHelper - .firstWriteByte((GenericIndexed x) -> VERSION_ONE) - .writeByte(x -> x.allowReverseLookup ? REVERSE_LOOKUP_ALLOWED : REVERSE_LOOKUP_DISALLOWED) - .writeInt(x -> Ints.checkedCast(x.theBuffer.remaining() + (long) Integer.BYTES)) - .writeInt(x -> x.size); - private static final SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); /** @@ -220,7 +214,7 @@ public class GenericIndexed implements CloseableIndexed, Serializer buffers, GenericIndexedWriter.compressedByteBuffersWriteObjectStrategy(compression, bufferSize, closer), false, - new DecompressingByteBufferObjectStrategy(order, compression) + DecompressingByteBufferObjectStrategy.of(order, compression) ); } @@ -238,75 +232,243 @@ public class GenericIndexed implements CloseableIndexed, Serializer return numberOfFilesRequired; } + protected final ObjectStrategy strategy; + protected final boolean allowReverseLookup; + protected final int size; - private final boolean versionOne; - - private final ObjectStrategy strategy; - private final boolean allowReverseLookup; - private final int size; - private final ByteBuffer headerBuffer; - - private final ByteBuffer firstValueBuffer; - - private final ByteBuffer[] valueBuffers; - private int logBaseTwoOfElementsPerValueFile; - private int relativeIndexMask; - - @Nullable - private final ByteBuffer theBuffer; - - /** - * Constructor for version one. - */ - GenericIndexed( - ByteBuffer buffer, - ObjectStrategy strategy, - boolean allowReverseLookup + public GenericIndexed( + final ObjectStrategy strategy, + final boolean allowReverseLookup, + final int size ) { - this.versionOne = true; - - this.theBuffer = buffer; this.strategy = strategy; this.allowReverseLookup = allowReverseLookup; - size = theBuffer.getInt(); - - int indexOffset = theBuffer.position(); - int valuesOffset = theBuffer.position() + size * Integer.BYTES; - - buffer.position(valuesOffset); - // Ensure the value buffer's limit equals to capacity. - firstValueBuffer = buffer.slice(); - valueBuffers = new ByteBuffer[]{firstValueBuffer}; - buffer.position(indexOffset); - headerBuffer = buffer.slice(); + this.size = size; } + public abstract BufferIndexed singleThreaded(); - /** - * Constructor for version two. - */ - GenericIndexed( - ByteBuffer[] valueBuffs, - ByteBuffer headerBuff, - ObjectStrategy strategy, - boolean allowReverseLookup, - int logBaseTwoOfElementsPerValueFile, - int numWritten - ) + @Override + public abstract long getSerializedSize(); + + private static final class V1 extends GenericIndexed { - this.versionOne = false; + @SuppressWarnings("rawtypes") + private static final MetaSerdeHelper META_SERDE_HELPER = MetaSerdeHelper + .firstWriteByte((GenericIndexed.V1 x) -> VERSION_ONE) + .writeByte(x -> x.allowReverseLookup ? REVERSE_LOOKUP_ALLOWED : REVERSE_LOOKUP_DISALLOWED) + .writeInt(x -> Ints.checkedCast(x.theBuffer.remaining() + (long) Integer.BYTES)) + .writeInt(x -> x.size); - this.theBuffer = null; - this.strategy = strategy; - this.allowReverseLookup = allowReverseLookup; - this.valueBuffers = valueBuffs; - this.firstValueBuffer = valueBuffers[0]; - this.headerBuffer = headerBuff; - this.size = numWritten; - this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile; - this.relativeIndexMask = (1 << logBaseTwoOfElementsPerValueFile) - 1; - headerBuffer.order(ByteOrder.nativeOrder()); + private final ByteBuffer theBuffer; + private final int headerOffset; + private final int valuesOffset; + + V1( + final ByteBuffer buffer, + final ObjectStrategy strategy, + final boolean allowReverseLookup + ) + { + super(strategy, allowReverseLookup, buffer.getInt()); + this.theBuffer = buffer; + this.headerOffset = theBuffer.position(); + this.valuesOffset = theBuffer.position() + size * Integer.BYTES; + } + + @Nullable + @Override + public T get(int index) + { + checkIndex(index); + + final int startOffset; + final int endOffset; + + if (index == 0) { + startOffset = Integer.BYTES; + endOffset = theBuffer.getInt(headerOffset); + } else { + int headerPosition = (index - 1) * Integer.BYTES; + startOffset = theBuffer.getInt(headerOffset + headerPosition) + Integer.BYTES; + endOffset = theBuffer.getInt(headerOffset + headerPosition + Integer.BYTES); + } + return copyBufferAndGet(theBuffer, valuesOffset + startOffset, valuesOffset + endOffset); + } + + @Override + public BufferIndexed singleThreaded() + { + final ByteBuffer copyBuffer = theBuffer.asReadOnlyBuffer(); + return new BufferIndexed() + { + @Nullable + @Override + protected ByteBuffer getByteBuffer(final int index) + { + checkIndex(index); + + final int startOffset; + final int endOffset; + + if (index == 0) { + startOffset = Integer.BYTES; + endOffset = theBuffer.getInt(headerOffset); + } else { + int headerPosition = (index - 1) * Integer.BYTES; + startOffset = theBuffer.getInt(headerOffset + headerPosition) + Integer.BYTES; + endOffset = theBuffer.getInt(headerOffset + headerPosition + Integer.BYTES); + } + return bufferedIndexedGetByteBuffer(copyBuffer, valuesOffset + startOffset, valuesOffset + endOffset); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("theBuffer", theBuffer); + inspector.visit("copyBuffer", copyBuffer); + inspector.visit("strategy", strategy); + } + }; + } + + @Override + public long getSerializedSize() + { + return META_SERDE_HELPER.size(this) + (long) theBuffer.remaining(); + } + + @Override + public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException + { + META_SERDE_HELPER.writeTo(channel, this); + Channels.writeFully(channel, theBuffer.asReadOnlyBuffer()); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("theBuffer", theBuffer); + inspector.visit("strategy", strategy); + } + } + + private static final class V2 extends GenericIndexed + { + private final ByteBuffer headerBuffer; + private final ByteBuffer[] valueBuffers; + private final int logBaseTwoOfElementsPerValueFile; + private final int relativeIndexMask; + + private V2( + ByteBuffer[] valueBuffs, + ByteBuffer headerBuff, + ObjectStrategy strategy, + boolean allowReverseLookup, + int logBaseTwoOfElementsPerValueFile, + int numWritten + ) + { + super(strategy, allowReverseLookup, numWritten); + this.valueBuffers = valueBuffs; + this.headerBuffer = headerBuff; + this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile; + this.relativeIndexMask = (1 << logBaseTwoOfElementsPerValueFile) - 1; + headerBuffer.order(ByteOrder.nativeOrder()); + } + + @Nullable + @Override + public T get(int index) + { + checkIndex(index); + + final int startOffset; + final int endOffset; + + int relativePositionOfIndex = index & relativeIndexMask; + if (relativePositionOfIndex == 0) { + int headerPosition = index * Integer.BYTES; + startOffset = Integer.BYTES; + endOffset = headerBuffer.getInt(headerPosition); + } else { + int headerPosition = (index - 1) * Integer.BYTES; + startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES; + endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES); + } + int fileNum = index >> logBaseTwoOfElementsPerValueFile; + return copyBufferAndGet(valueBuffers[fileNum], startOffset, endOffset); + } + + @Override + public BufferIndexed singleThreaded() + { + final ByteBuffer[] copyValueBuffers = new ByteBuffer[valueBuffers.length]; + for (int i = 0; i < valueBuffers.length; i++) { + copyValueBuffers[i] = valueBuffers[i].asReadOnlyBuffer(); + } + + return new BufferIndexed() + { + @Nullable + @Override + protected ByteBuffer getByteBuffer(int index) + { + checkIndex(index); + + final int startOffset; + final int endOffset; + + int relativePositionOfIndex = index & relativeIndexMask; + if (relativePositionOfIndex == 0) { + int headerPosition = index * Integer.BYTES; + startOffset = 4; + endOffset = headerBuffer.getInt(headerPosition); + } else { + int headerPosition = (index - 1) * Integer.BYTES; + startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES; + endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES); + } + int fileNum = index >> logBaseTwoOfElementsPerValueFile; + return bufferedIndexedGetByteBuffer(copyValueBuffers[fileNum], startOffset, endOffset); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("headerBuffer", headerBuffer); + // Inspecting just one example of copyValueBuffer, not needed to inspect the whole array, because all buffers + // in it are the same. + inspector.visit("copyValueBuffer", copyValueBuffers.length > 0 ? copyValueBuffers[0] : null); + inspector.visit("strategy", strategy); + } + }; + } + + @Override + public long getSerializedSize() + { + throw new UnsupportedOperationException("Method not supported for version 2 GenericIndexed."); + } + + @Override + public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) + { + throw new UnsupportedOperationException( + "GenericIndexed serialization for V2 is unsupported. Use GenericIndexedWriter instead."); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("headerBuffer", headerBuffer); + + // Inspecting just one example of valueBuffer, not needed to inspect the whole array, because all buffers in it + // are the same. + inspector.visit("valueBuffer", valueBuffers.length > 0 ? valueBuffers[0] : null); + inspector.visit("strategy", strategy); + } } /** @@ -317,7 +479,7 @@ public class GenericIndexed implements CloseableIndexed, Serializer * * @param index index identifying an element of an GenericIndexed. */ - private void checkIndex(int index) + protected void checkIndex(int index) { if (index < 0) { throw new IAE("Index[%s] < 0", index); @@ -338,12 +500,6 @@ public class GenericIndexed implements CloseableIndexed, Serializer return size; } - @Override - public T get(int index) - { - return versionOne ? getVersionOne(index) : getVersionTwo(index); - } - /** * Returns the index of "value" in this GenericIndexed object, or (-(insertion point) - 1) if the value is not * present, in the manner of Arrays.binarySearch. This strengthens the contract of Indexed, which only guarantees @@ -393,38 +549,8 @@ public class GenericIndexed implements CloseableIndexed, Serializer return IndexedIterable.create(this).iterator(); } - @Override - public long getSerializedSize() - { - if (!versionOne) { - throw new UnsupportedOperationException("Method not supported for version 2 GenericIndexed."); - } - return getSerializedSizeVersionOne(); - } - - @Override - public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException - { - if (versionOne) { - writeToVersionOne(channel); - } else { - throw new UnsupportedOperationException( - "GenericIndexed serialization for V2 is unsupported. Use GenericIndexedWriter instead."); - } - } - - /** - * Create a non-thread-safe Indexed, which may perform better than the underlying Indexed. - * - * @return a non-thread-safe Indexed - */ - public GenericIndexed.BufferIndexed singleThreaded() - { - return versionOne ? singleThreadedVersionOne() : singleThreadedVersionTwo(); - } - @Nullable - private T copyBufferAndGet(ByteBuffer valueBuffer, int startOffset, int endOffset) + protected T copyBufferAndGet(ByteBuffer valueBuffer, int startOffset, int endOffset) { ByteBuffer copyValueBuffer = valueBuffer.asReadOnlyBuffer(); int size = endOffset - startOffset; @@ -439,21 +565,6 @@ public class GenericIndexed implements CloseableIndexed, Serializer return strategy.fromByteBuffer(copyValueBuffer, size); } - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("versionOne", versionOne); - inspector.visit("headerBuffer", headerBuffer); - if (versionOne) { - inspector.visit("firstValueBuffer", firstValueBuffer); - } else { - // Inspecting just one example of valueBuffer, not needed to inspect the whole array, because all buffers in it - // are the same. - inspector.visit("valueBuffer", valueBuffers.length > 0 ? valueBuffers[0] : null); - } - inspector.visit("strategy", strategy); - } - /** * Single-threaded view. */ @@ -567,10 +678,6 @@ public class GenericIndexed implements CloseableIndexed, Serializer // nothing to close } - /////////////// - // VERSION ONE - /////////////// - private static GenericIndexed createGenericIndexedVersionOne(ByteBuffer byteBuffer, ObjectStrategy strategy) { boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED; @@ -579,7 +686,7 @@ public class GenericIndexed implements CloseableIndexed, Serializer bufferToUse.limit(bufferToUse.position() + size); byteBuffer.position(bufferToUse.limit()); - return new GenericIndexed<>( + return new GenericIndexed.V1<>( bufferToUse, strategy, allowReverseLookup @@ -597,7 +704,7 @@ public class GenericIndexed implements CloseableIndexed, Serializer if (!objects.hasNext()) { final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES).putInt(0); buffer.flip(); - return new GenericIndexed<>(buffer, resultObjectStrategy, allowReverseLookup); + return new GenericIndexed.V1<>(buffer, resultObjectStrategy, allowReverseLookup); } int count = 0; @@ -642,79 +749,9 @@ public class GenericIndexed implements CloseableIndexed, Serializer valuesOut.writeTo(theBuffer); theBuffer.flip(); - return new GenericIndexed<>(theBuffer.asReadOnlyBuffer(), resultObjectStrategy, allowReverseLookup); + return new GenericIndexed.V1<>(theBuffer.asReadOnlyBuffer(), resultObjectStrategy, allowReverseLookup); } - private long getSerializedSizeVersionOne() - { - return META_SERDE_HELPER.size(this) + (long) theBuffer.remaining(); - } - - @Nullable - private T getVersionOne(int index) - { - checkIndex(index); - - final int startOffset; - final int endOffset; - - if (index == 0) { - startOffset = Integer.BYTES; - endOffset = headerBuffer.getInt(0); - } else { - int headerPosition = (index - 1) * Integer.BYTES; - startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES; - endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES); - } - return copyBufferAndGet(firstValueBuffer, startOffset, endOffset); - } - - private BufferIndexed singleThreadedVersionOne() - { - final ByteBuffer copyBuffer = firstValueBuffer.asReadOnlyBuffer(); - return new BufferIndexed() - { - @Nullable - @Override - protected ByteBuffer getByteBuffer(final int index) - { - checkIndex(index); - - final int startOffset; - final int endOffset; - - if (index == 0) { - startOffset = Integer.BYTES; - endOffset = headerBuffer.getInt(0); - } else { - int headerPosition = (index - 1) * Integer.BYTES; - startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES; - endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES); - } - return bufferedIndexedGetByteBuffer(copyBuffer, startOffset, endOffset); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("headerBuffer", headerBuffer); - inspector.visit("copyBuffer", copyBuffer); - inspector.visit("strategy", strategy); - } - }; - } - - private void writeToVersionOne(WritableByteChannel channel) throws IOException - { - META_SERDE_HELPER.writeTo(channel, this); - Channels.writeFully(channel, theBuffer.asReadOnlyBuffer()); - } - - - /////////////// - // VERSION TWO - /////////////// - private static GenericIndexed createGenericIndexedVersionTwo( ByteBuffer byteBuffer, ObjectStrategy strategy, @@ -739,7 +776,7 @@ public class GenericIndexed implements CloseableIndexed, Serializer valueBuffersToUse[i] = valueBuffer.asReadOnlyBuffer(); } ByteBuffer headerBuffer = fileMapper.mapFile(GenericIndexedWriter.generateHeaderFileName(columnName)); - return new GenericIndexed<>( + return new GenericIndexed.V2<>( valueBuffersToUse, headerBuffer, strategy, @@ -752,70 +789,4 @@ public class GenericIndexed implements CloseableIndexed, Serializer throw new RuntimeException("File mapping failed.", e); } } - - @Nullable - private T getVersionTwo(int index) - { - checkIndex(index); - - final int startOffset; - final int endOffset; - - int relativePositionOfIndex = index & relativeIndexMask; - if (relativePositionOfIndex == 0) { - int headerPosition = index * Integer.BYTES; - startOffset = Integer.BYTES; - endOffset = headerBuffer.getInt(headerPosition); - } else { - int headerPosition = (index - 1) * Integer.BYTES; - startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES; - endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES); - } - int fileNum = index >> logBaseTwoOfElementsPerValueFile; - return copyBufferAndGet(valueBuffers[fileNum], startOffset, endOffset); - } - - private BufferIndexed singleThreadedVersionTwo() - { - final ByteBuffer[] copyValueBuffers = new ByteBuffer[valueBuffers.length]; - for (int i = 0; i < valueBuffers.length; i++) { - copyValueBuffers[i] = valueBuffers[i].asReadOnlyBuffer(); - } - - return new BufferIndexed() - { - @Nullable - @Override - protected ByteBuffer getByteBuffer(int index) - { - checkIndex(index); - - final int startOffset; - final int endOffset; - - int relativePositionOfIndex = index & relativeIndexMask; - if (relativePositionOfIndex == 0) { - int headerPosition = index * Integer.BYTES; - startOffset = 4; - endOffset = headerBuffer.getInt(headerPosition); - } else { - int headerPosition = (index - 1) * Integer.BYTES; - startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES; - endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES); - } - int fileNum = index >> logBaseTwoOfElementsPerValueFile; - return bufferedIndexedGetByteBuffer(copyValueBuffers[fileNum], startOffset, endOffset); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("headerBuffer", headerBuffer); - // Inspecting just one example of copyValueBuffer, not needed to inspect the whole array, because all buffers - // in it are the same. - inspector.visit("copyValueBuffer", copyValueBuffers.length > 0 ? copyValueBuffers[0] : null); - inspector.visit("strategy", strategy); - } - }; - } }