Use the whole frame when writing rows. (#17094) (#17195)

This patch makes the following adjustments to enable writing larger
single rows to frames:
1) RowBasedFrameWriter: Max out allocation size on the final doubling.
   i.e., if the final allocation "naturally" would be 1 MiB but the
   max frame size is 900 KiB, use 900 KiB rather than failing the 1 MiB
   allocation.
2) AppendableMemory: In reserveAdditional, release the last block if it
   is empty. This eliminates waste when a frame writer uses a
   successive-doubling approach to find the right allocation size.
3) ArenaMemoryAllocator: Reclaim memory from the last allocation when
   the last allocation is closed.

Prior to these changes, a single row could be much smaller than the
frame size and still fail to be added to the frame.

Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
This commit is contained in:
Kashif Faraz 2024-10-02 07:44:33 +05:30 committed by GitHub
parent 2f2807d379
commit 4a8008aebb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 347 additions and 90 deletions

View File

@ -1805,7 +1805,7 @@ public class MSQWindowTest extends MSQTestBase
.setSql(
"select cityName, added, SUM(added) OVER () cc from wikipedia")
.setQueryContext(customContext)
.setExpectedMSQFault(new TooManyRowsInAWindowFault(15921, 200))
.setExpectedMSQFault(new TooManyRowsInAWindowFault(15922, 200))
.verifyResults();
}

View File

@ -54,10 +54,10 @@ public class AppendableMemory implements Closeable
// One holder for every Memory we've allocated.
private final List<ResourceHolder<WritableMemory>> blockHolders = new ArrayList<>();
// The amount of space that has been used from each Memory block. Same length as "memoryHolders".
// The amount of space that has been used from each Memory block. Same length as "blockHolders".
private final IntList limits = new IntArrayList();
// The global starting position for each Memory block (blockNumber -> position). Same length as "memoryHolders".
// The global starting position for each Memory block (blockNumber -> position). Same length as "blockHolders".
private final LongArrayList globalStartPositions = new LongArrayList();
// Whether the blocks we've allocated are "packed"; meaning all non-final block limits equal the allocationSize.
@ -104,6 +104,36 @@ public class AppendableMemory implements Closeable
return cursor;
}
/**
* Maximum number that can be successfully passed to {@link #reserveAdditional(int)}.
*/
public int availableToReserve()
{
final int currentBlockIdx = currentBlockNumber();
final long availableInCurrentBlock;
final boolean currentBlockIsEmpty;
if (currentBlockIdx < 0) {
availableInCurrentBlock = 0;
currentBlockIsEmpty = false;
} else {
final int usedInCurrentBlock = limits.getInt(currentBlockIdx);
availableInCurrentBlock = blockHolders.get(currentBlockIdx).get().getCapacity() - usedInCurrentBlock;
currentBlockIsEmpty = usedInCurrentBlock == 0;
}
// If currentBlockIsEmpty, add availableInCurrentBlock to account for reclamation in reclaimLastBlockIfEmpty().
final long availableInAllocator = allocator.available() + (currentBlockIsEmpty ? availableInCurrentBlock : 0);
return (int) Math.min(
Integer.MAX_VALUE,
Math.max(
availableInAllocator,
availableInCurrentBlock
)
);
}
/**
* Ensure that at least "bytes" amount of space is available after the cursor. Allocates a new block if needed.
* Note: the amount of bytes is guaranteed to be in a *single* block.
@ -126,11 +156,13 @@ public class AppendableMemory implements Closeable
return true;
}
releaseLastBlockIfEmpty();
if (bytes > allocator.available()) {
return false;
}
final int idx = blockHolders.size() - 1;
final int idx = currentBlockNumber();
if (idx < 0 || bytes + limits.getInt(idx) > blockHolders.get(idx).get().getCapacity()) {
// Allocation needed.
@ -228,6 +260,9 @@ public class AppendableMemory implements Closeable
cursor.set(currentBlockMemory, newLimit, currentBlockMemory.getCapacity() - newLimit);
}
/**
* Current used size, in bytes.
*/
public long size()
{
long sz = 0;
@ -295,12 +330,21 @@ public class AppendableMemory implements Closeable
cursor.set(blockMemory, 0, blockMemory.getCapacity());
}
private int currentBlockNumber()
private void releaseLastBlockIfEmpty()
{
if (blockHolders.isEmpty()) {
return NO_BLOCK;
} else {
return blockHolders.size() - 1;
final int lastBlockNumber = currentBlockNumber();
if (lastBlockNumber != NO_BLOCK && limits.getInt(lastBlockNumber) == 0) {
blockHolders.remove(lastBlockNumber).close();
limits.removeInt(lastBlockNumber);
}
}
/**
* Returns the index into {@link #blockHolders} and {@link #limits} of the current block, or {@link #NO_BLOCK}
* if there are no blocks.
*/
private int currentBlockNumber()
{
return blockHolders.size() - 1;
}
}

View File

@ -37,6 +37,7 @@ public class ArenaMemoryAllocator implements MemoryAllocator
private final WritableMemory arena;
private long allocations = 0;
private long position = 0;
private WritableMemory lastAllocation;
private ArenaMemoryAllocator(WritableMemory arena)
{
@ -64,20 +65,23 @@ public class ArenaMemoryAllocator implements MemoryAllocator
@Override
public Optional<ResourceHolder<WritableMemory>> allocate(final long size)
{
if (position + size < arena.getCapacity()) {
if (position + size <= arena.getCapacity()) {
final long start = position;
allocations++;
position += size;
final WritableMemory memory = arena.writableRegion(start, size, ByteOrder.LITTLE_ENDIAN);
lastAllocation = memory;
return Optional.of(
new ResourceHolder<WritableMemory>()
{
private WritableMemory memory = arena.writableRegion(start, size, ByteOrder.LITTLE_ENDIAN);
boolean closed;
@Override
public WritableMemory get()
{
if (memory == null) {
if (closed) {
throw new ISE("Already closed");
}
@ -87,10 +91,21 @@ public class ArenaMemoryAllocator implements MemoryAllocator
@Override
public void close()
{
memory = null;
if (closed) {
return;
}
closed = true;
//noinspection ObjectEquality
if (memory == lastAllocation) {
// Last allocation closed; decrement position to enable partial arena reuse.
position -= memory.getCapacity();
lastAllocation = null;
}
if (--allocations == 0) {
// All allocations closed; reset position to enable arena reuse.
// All allocations closed; reset position to enable full arena reuse.
position = 0;
}
}

View File

@ -77,8 +77,10 @@ public class HeapMemoryAllocator implements MemoryAllocator
@Override
public void close()
{
memory = null;
bytesAllocated -= size;
if (memory != null) {
memory = null;
bytesAllocated -= size;
}
}
}
);

View File

@ -23,6 +23,7 @@ import com.google.common.base.Throwables;
import com.google.common.primitives.Ints;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.AppendableMemory;
@ -313,10 +314,22 @@ public class RowBasedFrameWriter implements FrameWriter
// Reset to beginning of loop.
i = -1;
final int priorAllocation = BASE_DATA_ALLOCATION_SIZE * reserveMultiple;
// Try again with a bigger allocation.
reserveMultiple *= 2;
if (!dataMemory.reserveAdditional(Ints.checkedCast((long) BASE_DATA_ALLOCATION_SIZE * reserveMultiple))) {
final int nextAllocation = Math.min(
dataMemory.availableToReserve(),
Ints.checkedCast((long) BASE_DATA_ALLOCATION_SIZE * reserveMultiple)
);
if (nextAllocation > priorAllocation) {
if (!dataMemory.reserveAdditional(nextAllocation)) {
// Shouldn't see this unless availableToReserve lied to us.
throw DruidException.defensive("Unexpected failure of dataMemory.reserveAdditional");
}
} else {
return false;
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.allocation;
public class ArenaMemoryAllocatorTest extends BaseMemoryAllocatorTest
{
@Override
protected MemoryAllocator makeAllocator(int capacity)
{
return ArenaMemoryAllocator.createOnHeap(capacity);
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.allocation;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.collections.ResourceHolder;
import org.junit.Assert;
import org.junit.Test;
import java.util.Optional;
/**
* Tests for {@link MemoryAllocator}, subclassed for each concrete implementation.
*/
public abstract class BaseMemoryAllocatorTest
{
private static final int ALLOCATOR_SIZE = 10;
protected abstract MemoryAllocator makeAllocator(int capacity);
@Test
public void testAllocationInSinglePass()
{
MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE);
Optional<ResourceHolder<WritableMemory>> memoryResourceHolderOptional = memoryAllocator.allocate(ALLOCATOR_SIZE);
Assert.assertTrue(memoryResourceHolderOptional.isPresent());
ResourceHolder<WritableMemory> memoryResourceHolder = memoryResourceHolderOptional.get();
WritableMemory memory = memoryResourceHolder.get();
for (int i = 0; i < ALLOCATOR_SIZE; ++i) {
memory.putByte(i, (byte) 0xFF);
}
}
@Test
public void testAllocationInMultiplePasses()
{
MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE);
Optional<ResourceHolder<WritableMemory>> memoryResourceHolderOptional1 = memoryAllocator.allocate(ALLOCATOR_SIZE
- 4);
Assert.assertTrue(memoryResourceHolderOptional1.isPresent());
ResourceHolder<WritableMemory> memoryResourceHolder1 = memoryResourceHolderOptional1.get();
WritableMemory memory1 = memoryResourceHolder1.get();
Optional<ResourceHolder<WritableMemory>> memoryResourceHolderOptional2 = memoryAllocator.allocate(4);
Assert.assertTrue(memoryResourceHolderOptional2.isPresent());
ResourceHolder<WritableMemory> memoryResourceHolder2 = memoryResourceHolderOptional2.get();
WritableMemory memory2 = memoryResourceHolder2.get();
for (int i = 0; i < ALLOCATOR_SIZE - 4; ++i) {
memory1.putByte(i, (byte) 0xFF);
}
for (int i = 0; i < 4; ++i) {
memory2.putByte(i, (byte) 0xFE);
}
// Readback to ensure that value hasn't been overwritten
for (int i = 0; i < ALLOCATOR_SIZE - 4; ++i) {
Assert.assertEquals((byte) 0xFF, memory1.getByte(i));
}
for (int i = 0; i < 4; ++i) {
Assert.assertEquals((byte) 0xFE, memory2.getByte(i));
}
}
@Test
public void testReleaseAllocationTwice()
{
final MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE);
final int allocationSize = 4;
final Optional<ResourceHolder<WritableMemory>> holder1 = memoryAllocator.allocate(allocationSize);
final Optional<ResourceHolder<WritableMemory>> holder2 = memoryAllocator.allocate(allocationSize);
Assert.assertTrue(holder1.isPresent());
Assert.assertTrue(holder2.isPresent());
Assert.assertEquals(ALLOCATOR_SIZE - allocationSize * 2, memoryAllocator.available());
// Release the second allocation.
holder2.get().close();
Assert.assertEquals(ALLOCATOR_SIZE - allocationSize, memoryAllocator.available());
// Release again-- does nothing.
holder2.get().close();
Assert.assertEquals(ALLOCATOR_SIZE - allocationSize, memoryAllocator.available());
// Release the first allocation.
holder1.get().close();
Assert.assertEquals(ALLOCATOR_SIZE, memoryAllocator.available());
}
@Test
public void testReleaseLastAllocationFirst()
{
final MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE);
final int allocationSize = 4;
final Optional<ResourceHolder<WritableMemory>> holder1 = memoryAllocator.allocate(allocationSize);
final Optional<ResourceHolder<WritableMemory>> holder2 = memoryAllocator.allocate(allocationSize);
Assert.assertTrue(holder1.isPresent());
Assert.assertTrue(holder2.isPresent());
Assert.assertEquals(ALLOCATOR_SIZE - allocationSize * 2, memoryAllocator.available());
// Release the second allocation.
holder2.get().close();
Assert.assertEquals(ALLOCATOR_SIZE - allocationSize, memoryAllocator.available());
// Release the first allocation.
holder1.get().close();
Assert.assertEquals(ALLOCATOR_SIZE, memoryAllocator.available());
}
@Test
public void testReleaseLastAllocationLast()
{
final MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE);
final int allocationSize = 4;
final Optional<ResourceHolder<WritableMemory>> holder1 = memoryAllocator.allocate(allocationSize);
final Optional<ResourceHolder<WritableMemory>> holder2 = memoryAllocator.allocate(allocationSize);
Assert.assertTrue(holder1.isPresent());
Assert.assertTrue(holder2.isPresent());
Assert.assertEquals(ALLOCATOR_SIZE - allocationSize * 2, memoryAllocator.available());
// Don't check memoryAllocator.available() after holder1 is closed; behavior is not consistent between arena
// and heap. Arena won't reclaim this allocation because it wasn't the final one; heap will reclaim it.
// They converge to fully-reclaimed once all allocations are closed.
holder1.get().close();
holder2.get().close();
Assert.assertEquals(ALLOCATOR_SIZE, memoryAllocator.available());
}
@Test
public void testOverallocationInSinglePass()
{
MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE);
Optional<ResourceHolder<WritableMemory>> memoryResourceHolderOptional =
memoryAllocator.allocate(ALLOCATOR_SIZE + 1);
Assert.assertFalse(memoryResourceHolderOptional.isPresent());
}
@Test
public void testOverallocationInMultiplePasses()
{
MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE);
Optional<ResourceHolder<WritableMemory>> memoryResourceHolderOptional =
memoryAllocator.allocate(ALLOCATOR_SIZE - 4);
Assert.assertTrue(memoryResourceHolderOptional.isPresent());
Assert.assertFalse(memoryAllocator.allocate(5).isPresent());
}
}

View File

@ -19,78 +19,11 @@
package org.apache.druid.frame.allocation;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.collections.ResourceHolder;
import org.junit.Assert;
import org.junit.Test;
import java.util.Optional;
public class HeapMemoryAllocatorTest
public class HeapMemoryAllocatorTest extends BaseMemoryAllocatorTest
{
private static final int ALLOCATOR_SIZE = 10;
@Test
public void testAllocationInSinglePass()
@Override
protected MemoryAllocator makeAllocator(int capacity)
{
MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE);
Optional<ResourceHolder<WritableMemory>> memoryResourceHolderOptional = heapMemoryAllocator.allocate(ALLOCATOR_SIZE);
Assert.assertTrue(memoryResourceHolderOptional.isPresent());
ResourceHolder<WritableMemory> memoryResourceHolder = memoryResourceHolderOptional.get();
WritableMemory memory = memoryResourceHolder.get();
for (int i = 0; i < ALLOCATOR_SIZE; ++i) {
memory.putByte(i, (byte) 0xFF);
}
return new HeapMemoryAllocator(capacity);
}
@Test
public void testAllocationInMultiplePasses()
{
MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE);
Optional<ResourceHolder<WritableMemory>> memoryResourceHolderOptional1 = heapMemoryAllocator.allocate(ALLOCATOR_SIZE
- 4);
Assert.assertTrue(memoryResourceHolderOptional1.isPresent());
ResourceHolder<WritableMemory> memoryResourceHolder1 = memoryResourceHolderOptional1.get();
WritableMemory memory1 = memoryResourceHolder1.get();
Optional<ResourceHolder<WritableMemory>> memoryResourceHolderOptional2 = heapMemoryAllocator.allocate(4);
Assert.assertTrue(memoryResourceHolderOptional2.isPresent());
ResourceHolder<WritableMemory> memoryResourceHolder2 = memoryResourceHolderOptional2.get();
WritableMemory memory2 = memoryResourceHolder2.get();
for (int i = 0; i < ALLOCATOR_SIZE - 4; ++i) {
memory1.putByte(i, (byte) 0xFF);
}
for (int i = 0; i < 4; ++i) {
memory2.putByte(i, (byte) 0xFE);
}
// Readback to ensure that value hasn't been overwritten
for (int i = 0; i < ALLOCATOR_SIZE - 4; ++i) {
Assert.assertEquals((byte) 0xFF, memory1.getByte(i));
}
for (int i = 0; i < 4; ++i) {
Assert.assertEquals((byte) 0xFE, memory2.getByte(i));
}
}
@Test
public void testOverallocationInSinglePass()
{
MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE);
Optional<ResourceHolder<WritableMemory>> memoryResourceHolderOptional =
heapMemoryAllocator.allocate(ALLOCATOR_SIZE + 1);
Assert.assertFalse(memoryResourceHolderOptional.isPresent());
}
@Test
public void testOverallocationInMultiplePasses()
{
MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE);
Optional<ResourceHolder<WritableMemory>> memoryResourceHolderOptional =
heapMemoryAllocator.allocate(ALLOCATOR_SIZE - 4);
Assert.assertTrue(memoryResourceHolderOptional.isPresent());
Assert.assertFalse(heapMemoryAllocator.allocate(5).isPresent());
}
}

View File

@ -20,21 +20,76 @@
package org.apache.druid.frame.write;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.allocation.AppendableMemory;
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
import org.apache.druid.frame.allocation.HeapMemoryAllocator;
import org.apache.druid.frame.field.LongFieldWriter;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
public class RowBasedFrameWriterTest
public class RowBasedFrameWriterTest extends InitializedNullHandlingTest
{
@Test
public void testAddSelectionWithException()
public void test_addSelection_singleLargeRow()
{
final RowSignature signature =
RowSignature.builder()
.add("n", ColumnType.LONG)
.add("s", ColumnType.STRING)
.build();
final byte[] largeUtf8 = new byte[990000];
Arrays.fill(largeUtf8, (byte) 'F');
final String largeString = StringUtils.fromUtf8(largeUtf8);
final Row largeRow = new MapBasedRow(0L, ImmutableMap.of("n", 3L, "s", largeString));
final FrameWriterFactory frameWriterFactory = FrameWriters.makeRowBasedFrameWriterFactory(
new ArenaMemoryAllocatorFactory(1_000_000),
signature,
ImmutableList.of(),
false
);
final ColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
() -> largeRow,
signature,
false,
false
);
final Frame frame;
try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(columnSelectorFactory)) {
Assert.assertTrue(frameWriter.addSelection());
frame = Frame.wrap(frameWriter.toByteArray());
}
FrameTestUtil.assertRowsEqual(
Sequences.simple(Collections.singletonList(ImmutableList.of(3L, largeString))),
FrameTestUtil.readRowsFromCursorFactory(FrameReader.create(signature).makeCursorFactory(frame))
);
}
@Test
public void test_addSelection_withException()
{
String colName = "colName";
String errorMsg = "Frame writer exception";