Reduce heap footprint of GenericIndexed. (#14563)

Two changes:

1) Intern DecompressingByteBufferObjectStrategy. Saves ~32 bytes per column.

2) Split GenericIndexed into GenericIndexed.V1 and GenericIndexed.V2. The
   major benefit here is isolating out the ByteBuffers that are only needed
   for V2. This saves ~80 bytes for V1 (one buffer instead of two).
This commit is contained in:
Gian Merlino 2023-07-12 08:11:41 -07:00 committed by GitHub
parent cc8b210e4c
commit 3711c0d987
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 264 additions and 272 deletions

View File

@ -45,7 +45,7 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier<ColumnarDoub
CompressionStrategy strategy CompressionStrategy strategy
) )
{ {
baseDoubleBuffers = GenericIndexed.read(fromBuffer, new DecompressingByteBufferObjectStrategy(byteOrder, strategy)); baseDoubleBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
this.totalSize = totalSize; this.totalSize = totalSize;
this.sizePer = sizePer; this.sizePer = sizePer;
} }

View File

@ -45,7 +45,7 @@ public class BlockLayoutColumnarFloatsSupplier implements Supplier<ColumnarFloat
CompressionStrategy strategy CompressionStrategy strategy
) )
{ {
baseFloatBuffers = GenericIndexed.read(fromBuffer, new DecompressingByteBufferObjectStrategy(byteOrder, strategy)); baseFloatBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
this.totalSize = totalSize; this.totalSize = totalSize;
this.sizePer = sizePer; this.sizePer = sizePer;
} }

View File

@ -47,7 +47,7 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier<ColumnarLongs>
CompressionStrategy strategy CompressionStrategy strategy
) )
{ {
baseLongBuffers = GenericIndexed.read(fromBuffer, new DecompressingByteBufferObjectStrategy(order, strategy)); baseLongBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(order, strategy));
this.totalSize = totalSize; this.totalSize = totalSize;
this.sizePer = sizePer; this.sizePer = sizePer;
this.baseReader = reader; this.baseReader = reader;

View File

@ -125,7 +125,7 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier<Columnar
return new CompressedColumnarIntsSupplier( return new CompressedColumnarIntsSupplier(
totalSize, totalSize,
sizePer, sizePer,
GenericIndexed.read(buffer, new DecompressingByteBufferObjectStrategy(order, compression)), GenericIndexed.read(buffer, DecompressingByteBufferObjectStrategy.of(order, compression)),
compression compression
); );
} }
@ -148,7 +148,7 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier<Columnar
return new CompressedColumnarIntsSupplier( return new CompressedColumnarIntsSupplier(
totalSize, totalSize,
sizePer, sizePer,
GenericIndexed.read(buffer, new DecompressingByteBufferObjectStrategy(order, compression), mapper), GenericIndexed.read(buffer, DecompressingByteBufferObjectStrategy.of(order, compression), mapper),
compression compression
); );
} }

View File

@ -158,7 +158,7 @@ public class CompressedVSizeColumnarIntsSupplier implements WritableSupplier<Col
totalSize, totalSize,
sizePer, sizePer,
numBytes, numBytes,
GenericIndexed.read(buffer, new DecompressingByteBufferObjectStrategy(order, compression)), GenericIndexed.read(buffer, DecompressingByteBufferObjectStrategy.of(order, compression)),
compression compression
); );
@ -186,7 +186,7 @@ public class CompressedVSizeColumnarIntsSupplier implements WritableSupplier<Col
totalSize, totalSize,
sizePer, sizePer,
numBytes, numBytes,
GenericIndexed.read(buffer, new DecompressingByteBufferObjectStrategy(order, compression), mapper), GenericIndexed.read(buffer, DecompressingByteBufferObjectStrategy.of(order, compression), mapper),
compression compression
); );

View File

@ -20,22 +20,43 @@
package org.apache.druid.segment.data; package org.apache.druid.segment.data;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.CompressedPools; import org.apache.druid.segment.CompressedPools;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.concurrent.ConcurrentHashMap;
public class DecompressingByteBufferObjectStrategy implements ObjectStrategy<ResourceHolder<ByteBuffer>> public class DecompressingByteBufferObjectStrategy implements ObjectStrategy<ResourceHolder<ByteBuffer>>
{ {
/**
* Cache strategies in a static, because there are not very many distinct ones -- there are only so many combinations
* of byte order and decompressor that we can possibly have -- and we need one of these per GenericIndexed, which
* is a class that we tend to have tons of in heap.
*/
private static final ConcurrentHashMap<Pair<ByteOrder, CompressionStrategy>, DecompressingByteBufferObjectStrategy> STRATEGIES =
new ConcurrentHashMap<>();
private final ByteOrder order; private final ByteOrder order;
private final CompressionStrategy.Decompressor decompressor; private final CompressionStrategy.Decompressor decompressor;
DecompressingByteBufferObjectStrategy(ByteOrder order, CompressionStrategy compression) private DecompressingByteBufferObjectStrategy(ByteOrder order, CompressionStrategy compression)
{ {
this.order = order; this.order = order;
this.decompressor = compression.getDecompressor(); this.decompressor = compression.getDecompressor();
} }
public static DecompressingByteBufferObjectStrategy of(
final ByteOrder order,
final CompressionStrategy compression
)
{
return STRATEGIES.computeIfAbsent(
Pair.of(order, compression),
pair -> new DecompressingByteBufferObjectStrategy(pair.lhs, pair.rhs)
);
}
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Class<ResourceHolder<ByteBuffer>> getClazz() public Class<ResourceHolder<ByteBuffer>> getClazz()

View File

@ -82,7 +82,7 @@ import java.util.Iterator;
* *
* @see GenericIndexedWriter * @see GenericIndexedWriter
*/ */
public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer public abstract class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
{ {
static final byte VERSION_ONE = 0x1; static final byte VERSION_ONE = 0x1;
static final byte VERSION_TWO = 0x2; static final byte VERSION_TWO = 0x2;
@ -91,12 +91,6 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
static final int NULL_VALUE_SIZE_MARKER = -1; static final int NULL_VALUE_SIZE_MARKER = -1;
private static final MetaSerdeHelper<GenericIndexed> META_SERDE_HELPER = MetaSerdeHelper
.firstWriteByte((GenericIndexed x) -> VERSION_ONE)
.writeByte(x -> x.allowReverseLookup ? REVERSE_LOOKUP_ALLOWED : REVERSE_LOOKUP_DISALLOWED)
.writeInt(x -> Ints.checkedCast(x.theBuffer.remaining() + (long) Integer.BYTES))
.writeInt(x -> x.size);
private static final SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); private static final SerializerUtils SERIALIZER_UTILS = new SerializerUtils();
/** /**
@ -220,7 +214,7 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
buffers, buffers,
GenericIndexedWriter.compressedByteBuffersWriteObjectStrategy(compression, bufferSize, closer), GenericIndexedWriter.compressedByteBuffersWriteObjectStrategy(compression, bufferSize, closer),
false, false,
new DecompressingByteBufferObjectStrategy(order, compression) DecompressingByteBufferObjectStrategy.of(order, compression)
); );
} }
@ -238,55 +232,136 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
return numberOfFilesRequired; return numberOfFilesRequired;
} }
protected final ObjectStrategy<T> strategy;
protected final boolean allowReverseLookup;
protected final int size;
private final boolean versionOne; public GenericIndexed(
final ObjectStrategy<T> strategy,
private final ObjectStrategy<T> strategy; final boolean allowReverseLookup,
private final boolean allowReverseLookup; final int size
private final int size;
private final ByteBuffer headerBuffer;
private final ByteBuffer firstValueBuffer;
private final ByteBuffer[] valueBuffers;
private int logBaseTwoOfElementsPerValueFile;
private int relativeIndexMask;
@Nullable
private final ByteBuffer theBuffer;
/**
* Constructor for version one.
*/
GenericIndexed(
ByteBuffer buffer,
ObjectStrategy<T> strategy,
boolean allowReverseLookup
) )
{ {
this.versionOne = true;
this.theBuffer = buffer;
this.strategy = strategy; this.strategy = strategy;
this.allowReverseLookup = allowReverseLookup; this.allowReverseLookup = allowReverseLookup;
size = theBuffer.getInt(); this.size = size;
int indexOffset = theBuffer.position();
int valuesOffset = theBuffer.position() + size * Integer.BYTES;
buffer.position(valuesOffset);
// Ensure the value buffer's limit equals to capacity.
firstValueBuffer = buffer.slice();
valueBuffers = new ByteBuffer[]{firstValueBuffer};
buffer.position(indexOffset);
headerBuffer = buffer.slice();
} }
public abstract BufferIndexed singleThreaded();
/** @Override
* Constructor for version two. public abstract long getSerializedSize();
*/
GenericIndexed( private static final class V1<T> extends GenericIndexed<T>
{
@SuppressWarnings("rawtypes")
private static final MetaSerdeHelper<GenericIndexed.V1> META_SERDE_HELPER = MetaSerdeHelper
.firstWriteByte((GenericIndexed.V1 x) -> VERSION_ONE)
.writeByte(x -> x.allowReverseLookup ? REVERSE_LOOKUP_ALLOWED : REVERSE_LOOKUP_DISALLOWED)
.writeInt(x -> Ints.checkedCast(x.theBuffer.remaining() + (long) Integer.BYTES))
.writeInt(x -> x.size);
private final ByteBuffer theBuffer;
private final int headerOffset;
private final int valuesOffset;
V1(
final ByteBuffer buffer,
final ObjectStrategy<T> strategy,
final boolean allowReverseLookup
)
{
super(strategy, allowReverseLookup, buffer.getInt());
this.theBuffer = buffer;
this.headerOffset = theBuffer.position();
this.valuesOffset = theBuffer.position() + size * Integer.BYTES;
}
@Nullable
@Override
public T get(int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
if (index == 0) {
startOffset = Integer.BYTES;
endOffset = theBuffer.getInt(headerOffset);
} else {
int headerPosition = (index - 1) * Integer.BYTES;
startOffset = theBuffer.getInt(headerOffset + headerPosition) + Integer.BYTES;
endOffset = theBuffer.getInt(headerOffset + headerPosition + Integer.BYTES);
}
return copyBufferAndGet(theBuffer, valuesOffset + startOffset, valuesOffset + endOffset);
}
@Override
public BufferIndexed singleThreaded()
{
final ByteBuffer copyBuffer = theBuffer.asReadOnlyBuffer();
return new BufferIndexed()
{
@Nullable
@Override
protected ByteBuffer getByteBuffer(final int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
if (index == 0) {
startOffset = Integer.BYTES;
endOffset = theBuffer.getInt(headerOffset);
} else {
int headerPosition = (index - 1) * Integer.BYTES;
startOffset = theBuffer.getInt(headerOffset + headerPosition) + Integer.BYTES;
endOffset = theBuffer.getInt(headerOffset + headerPosition + Integer.BYTES);
}
return bufferedIndexedGetByteBuffer(copyBuffer, valuesOffset + startOffset, valuesOffset + endOffset);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("theBuffer", theBuffer);
inspector.visit("copyBuffer", copyBuffer);
inspector.visit("strategy", strategy);
}
};
}
@Override
public long getSerializedSize()
{
return META_SERDE_HELPER.size(this) + (long) theBuffer.remaining();
}
@Override
public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
META_SERDE_HELPER.writeTo(channel, this);
Channels.writeFully(channel, theBuffer.asReadOnlyBuffer());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("theBuffer", theBuffer);
inspector.visit("strategy", strategy);
}
}
private static final class V2<T> extends GenericIndexed<T>
{
private final ByteBuffer headerBuffer;
private final ByteBuffer[] valueBuffers;
private final int logBaseTwoOfElementsPerValueFile;
private final int relativeIndexMask;
private V2(
ByteBuffer[] valueBuffs, ByteBuffer[] valueBuffs,
ByteBuffer headerBuff, ByteBuffer headerBuff,
ObjectStrategy<T> strategy, ObjectStrategy<T> strategy,
@ -295,20 +370,107 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
int numWritten int numWritten
) )
{ {
this.versionOne = false; super(strategy, allowReverseLookup, numWritten);
this.theBuffer = null;
this.strategy = strategy;
this.allowReverseLookup = allowReverseLookup;
this.valueBuffers = valueBuffs; this.valueBuffers = valueBuffs;
this.firstValueBuffer = valueBuffers[0];
this.headerBuffer = headerBuff; this.headerBuffer = headerBuff;
this.size = numWritten;
this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile; this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile;
this.relativeIndexMask = (1 << logBaseTwoOfElementsPerValueFile) - 1; this.relativeIndexMask = (1 << logBaseTwoOfElementsPerValueFile) - 1;
headerBuffer.order(ByteOrder.nativeOrder()); headerBuffer.order(ByteOrder.nativeOrder());
} }
@Nullable
@Override
public T get(int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
int relativePositionOfIndex = index & relativeIndexMask;
if (relativePositionOfIndex == 0) {
int headerPosition = index * Integer.BYTES;
startOffset = Integer.BYTES;
endOffset = headerBuffer.getInt(headerPosition);
} else {
int headerPosition = (index - 1) * Integer.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
}
int fileNum = index >> logBaseTwoOfElementsPerValueFile;
return copyBufferAndGet(valueBuffers[fileNum], startOffset, endOffset);
}
@Override
public BufferIndexed singleThreaded()
{
final ByteBuffer[] copyValueBuffers = new ByteBuffer[valueBuffers.length];
for (int i = 0; i < valueBuffers.length; i++) {
copyValueBuffers[i] = valueBuffers[i].asReadOnlyBuffer();
}
return new BufferIndexed()
{
@Nullable
@Override
protected ByteBuffer getByteBuffer(int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
int relativePositionOfIndex = index & relativeIndexMask;
if (relativePositionOfIndex == 0) {
int headerPosition = index * Integer.BYTES;
startOffset = 4;
endOffset = headerBuffer.getInt(headerPosition);
} else {
int headerPosition = (index - 1) * Integer.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
}
int fileNum = index >> logBaseTwoOfElementsPerValueFile;
return bufferedIndexedGetByteBuffer(copyValueBuffers[fileNum], startOffset, endOffset);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("headerBuffer", headerBuffer);
// Inspecting just one example of copyValueBuffer, not needed to inspect the whole array, because all buffers
// in it are the same.
inspector.visit("copyValueBuffer", copyValueBuffers.length > 0 ? copyValueBuffers[0] : null);
inspector.visit("strategy", strategy);
}
};
}
@Override
public long getSerializedSize()
{
throw new UnsupportedOperationException("Method not supported for version 2 GenericIndexed.");
}
@Override
public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
{
throw new UnsupportedOperationException(
"GenericIndexed serialization for V2 is unsupported. Use GenericIndexedWriter instead.");
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("headerBuffer", headerBuffer);
// Inspecting just one example of valueBuffer, not needed to inspect the whole array, because all buffers in it
// are the same.
inspector.visit("valueBuffer", valueBuffers.length > 0 ? valueBuffers[0] : null);
inspector.visit("strategy", strategy);
}
}
/** /**
* Checks if {@code index} a valid `element index` in GenericIndexed. * Checks if {@code index} a valid `element index` in GenericIndexed.
* Similar to Preconditions.checkElementIndex() except this method throws {@link IAE} with custom error message. * Similar to Preconditions.checkElementIndex() except this method throws {@link IAE} with custom error message.
@ -317,7 +479,7 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
* *
* @param index index identifying an element of an GenericIndexed. * @param index index identifying an element of an GenericIndexed.
*/ */
private void checkIndex(int index) protected void checkIndex(int index)
{ {
if (index < 0) { if (index < 0) {
throw new IAE("Index[%s] < 0", index); throw new IAE("Index[%s] < 0", index);
@ -338,12 +500,6 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
return size; return size;
} }
@Override
public T get(int index)
{
return versionOne ? getVersionOne(index) : getVersionTwo(index);
}
/** /**
* Returns the index of "value" in this GenericIndexed object, or (-(insertion point) - 1) if the value is not * Returns the index of "value" in this GenericIndexed object, or (-(insertion point) - 1) if the value is not
* present, in the manner of Arrays.binarySearch. This strengthens the contract of Indexed, which only guarantees * present, in the manner of Arrays.binarySearch. This strengthens the contract of Indexed, which only guarantees
@ -393,38 +549,8 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
return IndexedIterable.create(this).iterator(); return IndexedIterable.create(this).iterator();
} }
@Override
public long getSerializedSize()
{
if (!versionOne) {
throw new UnsupportedOperationException("Method not supported for version 2 GenericIndexed.");
}
return getSerializedSizeVersionOne();
}
@Override
public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
if (versionOne) {
writeToVersionOne(channel);
} else {
throw new UnsupportedOperationException(
"GenericIndexed serialization for V2 is unsupported. Use GenericIndexedWriter instead.");
}
}
/**
* Create a non-thread-safe Indexed, which may perform better than the underlying Indexed.
*
* @return a non-thread-safe Indexed
*/
public GenericIndexed<T>.BufferIndexed singleThreaded()
{
return versionOne ? singleThreadedVersionOne() : singleThreadedVersionTwo();
}
@Nullable @Nullable
private T copyBufferAndGet(ByteBuffer valueBuffer, int startOffset, int endOffset) protected T copyBufferAndGet(ByteBuffer valueBuffer, int startOffset, int endOffset)
{ {
ByteBuffer copyValueBuffer = valueBuffer.asReadOnlyBuffer(); ByteBuffer copyValueBuffer = valueBuffer.asReadOnlyBuffer();
int size = endOffset - startOffset; int size = endOffset - startOffset;
@ -439,21 +565,6 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
return strategy.fromByteBuffer(copyValueBuffer, size); return strategy.fromByteBuffer(copyValueBuffer, size);
} }
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("versionOne", versionOne);
inspector.visit("headerBuffer", headerBuffer);
if (versionOne) {
inspector.visit("firstValueBuffer", firstValueBuffer);
} else {
// Inspecting just one example of valueBuffer, not needed to inspect the whole array, because all buffers in it
// are the same.
inspector.visit("valueBuffer", valueBuffers.length > 0 ? valueBuffers[0] : null);
}
inspector.visit("strategy", strategy);
}
/** /**
* Single-threaded view. * Single-threaded view.
*/ */
@ -567,10 +678,6 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
// nothing to close // nothing to close
} }
///////////////
// VERSION ONE
///////////////
private static <T> GenericIndexed<T> createGenericIndexedVersionOne(ByteBuffer byteBuffer, ObjectStrategy<T> strategy) private static <T> GenericIndexed<T> createGenericIndexedVersionOne(ByteBuffer byteBuffer, ObjectStrategy<T> strategy)
{ {
boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED; boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED;
@ -579,7 +686,7 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
bufferToUse.limit(bufferToUse.position() + size); bufferToUse.limit(bufferToUse.position() + size);
byteBuffer.position(bufferToUse.limit()); byteBuffer.position(bufferToUse.limit());
return new GenericIndexed<>( return new GenericIndexed.V1<>(
bufferToUse, bufferToUse,
strategy, strategy,
allowReverseLookup allowReverseLookup
@ -597,7 +704,7 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
if (!objects.hasNext()) { if (!objects.hasNext()) {
final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES).putInt(0); final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES).putInt(0);
buffer.flip(); buffer.flip();
return new GenericIndexed<>(buffer, resultObjectStrategy, allowReverseLookup); return new GenericIndexed.V1<>(buffer, resultObjectStrategy, allowReverseLookup);
} }
int count = 0; int count = 0;
@ -642,79 +749,9 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
valuesOut.writeTo(theBuffer); valuesOut.writeTo(theBuffer);
theBuffer.flip(); theBuffer.flip();
return new GenericIndexed<>(theBuffer.asReadOnlyBuffer(), resultObjectStrategy, allowReverseLookup); return new GenericIndexed.V1<>(theBuffer.asReadOnlyBuffer(), resultObjectStrategy, allowReverseLookup);
} }
private long getSerializedSizeVersionOne()
{
return META_SERDE_HELPER.size(this) + (long) theBuffer.remaining();
}
@Nullable
private T getVersionOne(int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
if (index == 0) {
startOffset = Integer.BYTES;
endOffset = headerBuffer.getInt(0);
} else {
int headerPosition = (index - 1) * Integer.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
}
return copyBufferAndGet(firstValueBuffer, startOffset, endOffset);
}
private BufferIndexed singleThreadedVersionOne()
{
final ByteBuffer copyBuffer = firstValueBuffer.asReadOnlyBuffer();
return new BufferIndexed()
{
@Nullable
@Override
protected ByteBuffer getByteBuffer(final int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
if (index == 0) {
startOffset = Integer.BYTES;
endOffset = headerBuffer.getInt(0);
} else {
int headerPosition = (index - 1) * Integer.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
}
return bufferedIndexedGetByteBuffer(copyBuffer, startOffset, endOffset);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("headerBuffer", headerBuffer);
inspector.visit("copyBuffer", copyBuffer);
inspector.visit("strategy", strategy);
}
};
}
private void writeToVersionOne(WritableByteChannel channel) throws IOException
{
META_SERDE_HELPER.writeTo(channel, this);
Channels.writeFully(channel, theBuffer.asReadOnlyBuffer());
}
///////////////
// VERSION TWO
///////////////
private static <T> GenericIndexed<T> createGenericIndexedVersionTwo( private static <T> GenericIndexed<T> createGenericIndexedVersionTwo(
ByteBuffer byteBuffer, ByteBuffer byteBuffer,
ObjectStrategy<T> strategy, ObjectStrategy<T> strategy,
@ -739,7 +776,7 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
valueBuffersToUse[i] = valueBuffer.asReadOnlyBuffer(); valueBuffersToUse[i] = valueBuffer.asReadOnlyBuffer();
} }
ByteBuffer headerBuffer = fileMapper.mapFile(GenericIndexedWriter.generateHeaderFileName(columnName)); ByteBuffer headerBuffer = fileMapper.mapFile(GenericIndexedWriter.generateHeaderFileName(columnName));
return new GenericIndexed<>( return new GenericIndexed.V2<>(
valueBuffersToUse, valueBuffersToUse,
headerBuffer, headerBuffer,
strategy, strategy,
@ -752,70 +789,4 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
throw new RuntimeException("File mapping failed.", e); throw new RuntimeException("File mapping failed.", e);
} }
} }
@Nullable
private T getVersionTwo(int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
int relativePositionOfIndex = index & relativeIndexMask;
if (relativePositionOfIndex == 0) {
int headerPosition = index * Integer.BYTES;
startOffset = Integer.BYTES;
endOffset = headerBuffer.getInt(headerPosition);
} else {
int headerPosition = (index - 1) * Integer.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
}
int fileNum = index >> logBaseTwoOfElementsPerValueFile;
return copyBufferAndGet(valueBuffers[fileNum], startOffset, endOffset);
}
private BufferIndexed singleThreadedVersionTwo()
{
final ByteBuffer[] copyValueBuffers = new ByteBuffer[valueBuffers.length];
for (int i = 0; i < valueBuffers.length; i++) {
copyValueBuffers[i] = valueBuffers[i].asReadOnlyBuffer();
}
return new BufferIndexed()
{
@Nullable
@Override
protected ByteBuffer getByteBuffer(int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
int relativePositionOfIndex = index & relativeIndexMask;
if (relativePositionOfIndex == 0) {
int headerPosition = index * Integer.BYTES;
startOffset = 4;
endOffset = headerBuffer.getInt(headerPosition);
} else {
int headerPosition = (index - 1) * Integer.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
}
int fileNum = index >> logBaseTwoOfElementsPerValueFile;
return bufferedIndexedGetByteBuffer(copyValueBuffers[fileNum], startOffset, endOffset);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("headerBuffer", headerBuffer);
// Inspecting just one example of copyValueBuffer, not needed to inspect the whole array, because all buffers
// in it are the same.
inspector.visit("copyValueBuffer", copyValueBuffers.length > 0 ? copyValueBuffers[0] : null);
inspector.visit("strategy", strategy);
}
};
}
} }