From 4a8008aebb7b3f324325feabd1b5a11a07e76022 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 2 Oct 2024 07:44:33 +0530 Subject: [PATCH] Use the whole frame when writing rows. (#17094) (#17195) This patch makes the following adjustments to enable writing larger single rows to frames: 1) RowBasedFrameWriter: Max out allocation size on the final doubling. i.e., if the final allocation "naturally" would be 1 MiB but the max frame size is 900 KiB, use 900 KiB rather than failing the 1 MiB allocation. 2) AppendableMemory: In reserveAdditional, release the last block if it is empty. This eliminates waste when a frame writer uses a successive-doubling approach to find the right allocation size. 3) ArenaMemoryAllocator: Reclaim memory from the last allocation when the last allocation is closed. Prior to these changes, a single row could be much smaller than the frame size and still fail to be added to the frame. Co-authored-by: Gian Merlino --- .../apache/druid/msq/exec/MSQWindowTest.java | 2 +- .../frame/allocation/AppendableMemory.java | 60 ++++++- .../allocation/ArenaMemoryAllocator.java | 25 ++- .../frame/allocation/HeapMemoryAllocator.java | 6 +- .../frame/write/RowBasedFrameWriter.java | 15 +- .../allocation/ArenaMemoryAllocatorTest.java | 29 +++ .../allocation/BaseMemoryAllocatorTest.java | 166 ++++++++++++++++++ .../allocation/HeapMemoryAllocatorTest.java | 75 +------- .../frame/write/RowBasedFrameWriterTest.java | 59 ++++++- 9 files changed, 347 insertions(+), 90 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/frame/allocation/ArenaMemoryAllocatorTest.java create mode 100644 processing/src/test/java/org/apache/druid/frame/allocation/BaseMemoryAllocatorTest.java diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java index 486014101c5..5c8adbd89f6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java @@ -1805,7 +1805,7 @@ public class MSQWindowTest extends MSQTestBase .setSql( "select cityName, added, SUM(added) OVER () cc from wikipedia") .setQueryContext(customContext) - .setExpectedMSQFault(new TooManyRowsInAWindowFault(15921, 200)) + .setExpectedMSQFault(new TooManyRowsInAWindowFault(15922, 200)) .verifyResults(); } diff --git a/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java b/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java index 11edf396c9b..8d961f298ce 100644 --- a/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java +++ b/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java @@ -54,10 +54,10 @@ public class AppendableMemory implements Closeable // One holder for every Memory we've allocated. private final List> blockHolders = new ArrayList<>(); - // The amount of space that has been used from each Memory block. Same length as "memoryHolders". + // The amount of space that has been used from each Memory block. Same length as "blockHolders". private final IntList limits = new IntArrayList(); - // The global starting position for each Memory block (blockNumber -> position). Same length as "memoryHolders". + // The global starting position for each Memory block (blockNumber -> position). Same length as "blockHolders". private final LongArrayList globalStartPositions = new LongArrayList(); // Whether the blocks we've allocated are "packed"; meaning all non-final block limits equal the allocationSize. @@ -104,6 +104,36 @@ public class AppendableMemory implements Closeable return cursor; } + /** + * Maximum number that can be successfully passed to {@link #reserveAdditional(int)}. + */ + public int availableToReserve() + { + final int currentBlockIdx = currentBlockNumber(); + final long availableInCurrentBlock; + final boolean currentBlockIsEmpty; + + if (currentBlockIdx < 0) { + availableInCurrentBlock = 0; + currentBlockIsEmpty = false; + } else { + final int usedInCurrentBlock = limits.getInt(currentBlockIdx); + availableInCurrentBlock = blockHolders.get(currentBlockIdx).get().getCapacity() - usedInCurrentBlock; + currentBlockIsEmpty = usedInCurrentBlock == 0; + } + + // If currentBlockIsEmpty, add availableInCurrentBlock to account for reclamation in reclaimLastBlockIfEmpty(). + final long availableInAllocator = allocator.available() + (currentBlockIsEmpty ? availableInCurrentBlock : 0); + + return (int) Math.min( + Integer.MAX_VALUE, + Math.max( + availableInAllocator, + availableInCurrentBlock + ) + ); + } + /** * Ensure that at least "bytes" amount of space is available after the cursor. Allocates a new block if needed. * Note: the amount of bytes is guaranteed to be in a *single* block. @@ -126,11 +156,13 @@ public class AppendableMemory implements Closeable return true; } + releaseLastBlockIfEmpty(); + if (bytes > allocator.available()) { return false; } - final int idx = blockHolders.size() - 1; + final int idx = currentBlockNumber(); if (idx < 0 || bytes + limits.getInt(idx) > blockHolders.get(idx).get().getCapacity()) { // Allocation needed. @@ -228,6 +260,9 @@ public class AppendableMemory implements Closeable cursor.set(currentBlockMemory, newLimit, currentBlockMemory.getCapacity() - newLimit); } + /** + * Current used size, in bytes. + */ public long size() { long sz = 0; @@ -295,12 +330,21 @@ public class AppendableMemory implements Closeable cursor.set(blockMemory, 0, blockMemory.getCapacity()); } - private int currentBlockNumber() + private void releaseLastBlockIfEmpty() { - if (blockHolders.isEmpty()) { - return NO_BLOCK; - } else { - return blockHolders.size() - 1; + final int lastBlockNumber = currentBlockNumber(); + if (lastBlockNumber != NO_BLOCK && limits.getInt(lastBlockNumber) == 0) { + blockHolders.remove(lastBlockNumber).close(); + limits.removeInt(lastBlockNumber); } } + + /** + * Returns the index into {@link #blockHolders} and {@link #limits} of the current block, or {@link #NO_BLOCK} + * if there are no blocks. + */ + private int currentBlockNumber() + { + return blockHolders.size() - 1; + } } diff --git a/processing/src/main/java/org/apache/druid/frame/allocation/ArenaMemoryAllocator.java b/processing/src/main/java/org/apache/druid/frame/allocation/ArenaMemoryAllocator.java index 7b8c23e2936..ebc49333552 100644 --- a/processing/src/main/java/org/apache/druid/frame/allocation/ArenaMemoryAllocator.java +++ b/processing/src/main/java/org/apache/druid/frame/allocation/ArenaMemoryAllocator.java @@ -37,6 +37,7 @@ public class ArenaMemoryAllocator implements MemoryAllocator private final WritableMemory arena; private long allocations = 0; private long position = 0; + private WritableMemory lastAllocation; private ArenaMemoryAllocator(WritableMemory arena) { @@ -64,20 +65,23 @@ public class ArenaMemoryAllocator implements MemoryAllocator @Override public Optional> allocate(final long size) { - if (position + size < arena.getCapacity()) { + if (position + size <= arena.getCapacity()) { final long start = position; allocations++; position += size; + final WritableMemory memory = arena.writableRegion(start, size, ByteOrder.LITTLE_ENDIAN); + lastAllocation = memory; + return Optional.of( new ResourceHolder() { - private WritableMemory memory = arena.writableRegion(start, size, ByteOrder.LITTLE_ENDIAN); + boolean closed; @Override public WritableMemory get() { - if (memory == null) { + if (closed) { throw new ISE("Already closed"); } @@ -87,10 +91,21 @@ public class ArenaMemoryAllocator implements MemoryAllocator @Override public void close() { - memory = null; + if (closed) { + return; + } + + closed = true; + + //noinspection ObjectEquality + if (memory == lastAllocation) { + // Last allocation closed; decrement position to enable partial arena reuse. + position -= memory.getCapacity(); + lastAllocation = null; + } if (--allocations == 0) { - // All allocations closed; reset position to enable arena reuse. + // All allocations closed; reset position to enable full arena reuse. position = 0; } } diff --git a/processing/src/main/java/org/apache/druid/frame/allocation/HeapMemoryAllocator.java b/processing/src/main/java/org/apache/druid/frame/allocation/HeapMemoryAllocator.java index ee3af073f88..a78c53c4a37 100644 --- a/processing/src/main/java/org/apache/druid/frame/allocation/HeapMemoryAllocator.java +++ b/processing/src/main/java/org/apache/druid/frame/allocation/HeapMemoryAllocator.java @@ -77,8 +77,10 @@ public class HeapMemoryAllocator implements MemoryAllocator @Override public void close() { - memory = null; - bytesAllocated -= size; + if (memory != null) { + memory = null; + bytesAllocated -= size; + } } } ); diff --git a/processing/src/main/java/org/apache/druid/frame/write/RowBasedFrameWriter.java b/processing/src/main/java/org/apache/druid/frame/write/RowBasedFrameWriter.java index beae50dca9d..c33b4c32b8a 100644 --- a/processing/src/main/java/org/apache/druid/frame/write/RowBasedFrameWriter.java +++ b/processing/src/main/java/org/apache/druid/frame/write/RowBasedFrameWriter.java @@ -23,6 +23,7 @@ import com.google.common.base.Throwables; import com.google.common.primitives.Ints; import org.apache.datasketches.memory.Memory; import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; import org.apache.druid.frame.allocation.AppendableMemory; @@ -313,10 +314,22 @@ public class RowBasedFrameWriter implements FrameWriter // Reset to beginning of loop. i = -1; + final int priorAllocation = BASE_DATA_ALLOCATION_SIZE * reserveMultiple; + // Try again with a bigger allocation. reserveMultiple *= 2; - if (!dataMemory.reserveAdditional(Ints.checkedCast((long) BASE_DATA_ALLOCATION_SIZE * reserveMultiple))) { + final int nextAllocation = Math.min( + dataMemory.availableToReserve(), + Ints.checkedCast((long) BASE_DATA_ALLOCATION_SIZE * reserveMultiple) + ); + + if (nextAllocation > priorAllocation) { + if (!dataMemory.reserveAdditional(nextAllocation)) { + // Shouldn't see this unless availableToReserve lied to us. + throw DruidException.defensive("Unexpected failure of dataMemory.reserveAdditional"); + } + } else { return false; } diff --git a/processing/src/test/java/org/apache/druid/frame/allocation/ArenaMemoryAllocatorTest.java b/processing/src/test/java/org/apache/druid/frame/allocation/ArenaMemoryAllocatorTest.java new file mode 100644 index 00000000000..e1a637ea047 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/allocation/ArenaMemoryAllocatorTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.allocation; + +public class ArenaMemoryAllocatorTest extends BaseMemoryAllocatorTest +{ + @Override + protected MemoryAllocator makeAllocator(int capacity) + { + return ArenaMemoryAllocator.createOnHeap(capacity); + } +} diff --git a/processing/src/test/java/org/apache/druid/frame/allocation/BaseMemoryAllocatorTest.java b/processing/src/test/java/org/apache/druid/frame/allocation/BaseMemoryAllocatorTest.java new file mode 100644 index 00000000000..cd3952accdc --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/allocation/BaseMemoryAllocatorTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.allocation; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.collections.ResourceHolder; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Optional; + +/** + * Tests for {@link MemoryAllocator}, subclassed for each concrete implementation. + */ +public abstract class BaseMemoryAllocatorTest +{ + private static final int ALLOCATOR_SIZE = 10; + + protected abstract MemoryAllocator makeAllocator(int capacity); + + @Test + public void testAllocationInSinglePass() + { + MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE); + Optional> memoryResourceHolderOptional = memoryAllocator.allocate(ALLOCATOR_SIZE); + Assert.assertTrue(memoryResourceHolderOptional.isPresent()); + ResourceHolder memoryResourceHolder = memoryResourceHolderOptional.get(); + WritableMemory memory = memoryResourceHolder.get(); + for (int i = 0; i < ALLOCATOR_SIZE; ++i) { + memory.putByte(i, (byte) 0xFF); + } + } + + @Test + public void testAllocationInMultiplePasses() + { + MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE); + + Optional> memoryResourceHolderOptional1 = memoryAllocator.allocate(ALLOCATOR_SIZE + - 4); + Assert.assertTrue(memoryResourceHolderOptional1.isPresent()); + ResourceHolder memoryResourceHolder1 = memoryResourceHolderOptional1.get(); + WritableMemory memory1 = memoryResourceHolder1.get(); + + Optional> memoryResourceHolderOptional2 = memoryAllocator.allocate(4); + Assert.assertTrue(memoryResourceHolderOptional2.isPresent()); + ResourceHolder memoryResourceHolder2 = memoryResourceHolderOptional2.get(); + WritableMemory memory2 = memoryResourceHolder2.get(); + + for (int i = 0; i < ALLOCATOR_SIZE - 4; ++i) { + memory1.putByte(i, (byte) 0xFF); + } + for (int i = 0; i < 4; ++i) { + memory2.putByte(i, (byte) 0xFE); + } + // Readback to ensure that value hasn't been overwritten + for (int i = 0; i < ALLOCATOR_SIZE - 4; ++i) { + Assert.assertEquals((byte) 0xFF, memory1.getByte(i)); + } + for (int i = 0; i < 4; ++i) { + Assert.assertEquals((byte) 0xFE, memory2.getByte(i)); + } + } + + @Test + public void testReleaseAllocationTwice() + { + final MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE); + final int allocationSize = 4; + + final Optional> holder1 = memoryAllocator.allocate(allocationSize); + final Optional> holder2 = memoryAllocator.allocate(allocationSize); + Assert.assertTrue(holder1.isPresent()); + Assert.assertTrue(holder2.isPresent()); + Assert.assertEquals(ALLOCATOR_SIZE - allocationSize * 2, memoryAllocator.available()); + + // Release the second allocation. + holder2.get().close(); + Assert.assertEquals(ALLOCATOR_SIZE - allocationSize, memoryAllocator.available()); + + // Release again-- does nothing. + holder2.get().close(); + Assert.assertEquals(ALLOCATOR_SIZE - allocationSize, memoryAllocator.available()); + + // Release the first allocation. + holder1.get().close(); + Assert.assertEquals(ALLOCATOR_SIZE, memoryAllocator.available()); + } + + @Test + public void testReleaseLastAllocationFirst() + { + final MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE); + final int allocationSize = 4; + + final Optional> holder1 = memoryAllocator.allocate(allocationSize); + final Optional> holder2 = memoryAllocator.allocate(allocationSize); + Assert.assertTrue(holder1.isPresent()); + Assert.assertTrue(holder2.isPresent()); + Assert.assertEquals(ALLOCATOR_SIZE - allocationSize * 2, memoryAllocator.available()); + + // Release the second allocation. + holder2.get().close(); + Assert.assertEquals(ALLOCATOR_SIZE - allocationSize, memoryAllocator.available()); + + // Release the first allocation. + holder1.get().close(); + Assert.assertEquals(ALLOCATOR_SIZE, memoryAllocator.available()); + } + + @Test + public void testReleaseLastAllocationLast() + { + final MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE); + final int allocationSize = 4; + + final Optional> holder1 = memoryAllocator.allocate(allocationSize); + final Optional> holder2 = memoryAllocator.allocate(allocationSize); + Assert.assertTrue(holder1.isPresent()); + Assert.assertTrue(holder2.isPresent()); + Assert.assertEquals(ALLOCATOR_SIZE - allocationSize * 2, memoryAllocator.available()); + + // Don't check memoryAllocator.available() after holder1 is closed; behavior is not consistent between arena + // and heap. Arena won't reclaim this allocation because it wasn't the final one; heap will reclaim it. + // They converge to fully-reclaimed once all allocations are closed. + holder1.get().close(); + holder2.get().close(); + Assert.assertEquals(ALLOCATOR_SIZE, memoryAllocator.available()); + } + + @Test + public void testOverallocationInSinglePass() + { + MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE); + Optional> memoryResourceHolderOptional = + memoryAllocator.allocate(ALLOCATOR_SIZE + 1); + Assert.assertFalse(memoryResourceHolderOptional.isPresent()); + } + + @Test + public void testOverallocationInMultiplePasses() + { + MemoryAllocator memoryAllocator = makeAllocator(ALLOCATOR_SIZE); + Optional> memoryResourceHolderOptional = + memoryAllocator.allocate(ALLOCATOR_SIZE - 4); + Assert.assertTrue(memoryResourceHolderOptional.isPresent()); + Assert.assertFalse(memoryAllocator.allocate(5).isPresent()); + } +} diff --git a/processing/src/test/java/org/apache/druid/frame/allocation/HeapMemoryAllocatorTest.java b/processing/src/test/java/org/apache/druid/frame/allocation/HeapMemoryAllocatorTest.java index ac69eaee8a5..67f63d312f5 100644 --- a/processing/src/test/java/org/apache/druid/frame/allocation/HeapMemoryAllocatorTest.java +++ b/processing/src/test/java/org/apache/druid/frame/allocation/HeapMemoryAllocatorTest.java @@ -19,78 +19,11 @@ package org.apache.druid.frame.allocation; -import org.apache.datasketches.memory.WritableMemory; -import org.apache.druid.collections.ResourceHolder; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Optional; - -public class HeapMemoryAllocatorTest +public class HeapMemoryAllocatorTest extends BaseMemoryAllocatorTest { - private static final int ALLOCATOR_SIZE = 10; - - @Test - public void testAllocationInSinglePass() + @Override + protected MemoryAllocator makeAllocator(int capacity) { - MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE); - Optional> memoryResourceHolderOptional = heapMemoryAllocator.allocate(ALLOCATOR_SIZE); - Assert.assertTrue(memoryResourceHolderOptional.isPresent()); - ResourceHolder memoryResourceHolder = memoryResourceHolderOptional.get(); - WritableMemory memory = memoryResourceHolder.get(); - for (int i = 0; i < ALLOCATOR_SIZE; ++i) { - memory.putByte(i, (byte) 0xFF); - } + return new HeapMemoryAllocator(capacity); } - - @Test - public void testAllocationInMultiplePasses() - { - MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE); - - Optional> memoryResourceHolderOptional1 = heapMemoryAllocator.allocate(ALLOCATOR_SIZE - - 4); - Assert.assertTrue(memoryResourceHolderOptional1.isPresent()); - ResourceHolder memoryResourceHolder1 = memoryResourceHolderOptional1.get(); - WritableMemory memory1 = memoryResourceHolder1.get(); - - Optional> memoryResourceHolderOptional2 = heapMemoryAllocator.allocate(4); - Assert.assertTrue(memoryResourceHolderOptional2.isPresent()); - ResourceHolder memoryResourceHolder2 = memoryResourceHolderOptional2.get(); - WritableMemory memory2 = memoryResourceHolder2.get(); - - for (int i = 0; i < ALLOCATOR_SIZE - 4; ++i) { - memory1.putByte(i, (byte) 0xFF); - } - for (int i = 0; i < 4; ++i) { - memory2.putByte(i, (byte) 0xFE); - } - // Readback to ensure that value hasn't been overwritten - for (int i = 0; i < ALLOCATOR_SIZE - 4; ++i) { - Assert.assertEquals((byte) 0xFF, memory1.getByte(i)); - } - for (int i = 0; i < 4; ++i) { - Assert.assertEquals((byte) 0xFE, memory2.getByte(i)); - } - } - - @Test - public void testOverallocationInSinglePass() - { - MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE); - Optional> memoryResourceHolderOptional = - heapMemoryAllocator.allocate(ALLOCATOR_SIZE + 1); - Assert.assertFalse(memoryResourceHolderOptional.isPresent()); - } - - @Test - public void testOverallocationInMultiplePasses() - { - MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE); - Optional> memoryResourceHolderOptional = - heapMemoryAllocator.allocate(ALLOCATOR_SIZE - 4); - Assert.assertTrue(memoryResourceHolderOptional.isPresent()); - Assert.assertFalse(heapMemoryAllocator.allocate(5).isPresent()); - } - } diff --git a/processing/src/test/java/org/apache/druid/frame/write/RowBasedFrameWriterTest.java b/processing/src/test/java/org/apache/druid/frame/write/RowBasedFrameWriterTest.java index bb7f9e55878..593004abb6d 100644 --- a/processing/src/test/java/org/apache/druid/frame/write/RowBasedFrameWriterTest.java +++ b/processing/src/test/java/org/apache/druid/frame/write/RowBasedFrameWriterTest.java @@ -20,21 +20,76 @@ package org.apache.druid.frame.write; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.MapBasedRow; +import org.apache.druid.data.input.Row; +import org.apache.druid.frame.Frame; import org.apache.druid.frame.allocation.AppendableMemory; +import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory; import org.apache.druid.frame.allocation.HeapMemoryAllocator; import org.apache.druid.frame.field.LongFieldWriter; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.testutil.FrameTestUtil; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.RowAdapters; +import org.apache.druid.segment.RowBasedColumnSelectorFactory; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; import java.util.Collections; -public class RowBasedFrameWriterTest +public class RowBasedFrameWriterTest extends InitializedNullHandlingTest { @Test - public void testAddSelectionWithException() + public void test_addSelection_singleLargeRow() + { + final RowSignature signature = + RowSignature.builder() + .add("n", ColumnType.LONG) + .add("s", ColumnType.STRING) + .build(); + + final byte[] largeUtf8 = new byte[990000]; + Arrays.fill(largeUtf8, (byte) 'F'); + final String largeString = StringUtils.fromUtf8(largeUtf8); + final Row largeRow = new MapBasedRow(0L, ImmutableMap.of("n", 3L, "s", largeString)); + + final FrameWriterFactory frameWriterFactory = FrameWriters.makeRowBasedFrameWriterFactory( + new ArenaMemoryAllocatorFactory(1_000_000), + signature, + ImmutableList.of(), + false + ); + + final ColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create( + RowAdapters.standardRow(), + () -> largeRow, + signature, + false, + false + ); + + final Frame frame; + try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(columnSelectorFactory)) { + Assert.assertTrue(frameWriter.addSelection()); + frame = Frame.wrap(frameWriter.toByteArray()); + } + + FrameTestUtil.assertRowsEqual( + Sequences.simple(Collections.singletonList(ImmutableList.of(3L, largeString))), + FrameTestUtil.readRowsFromCursorFactory(FrameReader.create(signature).makeCursorFactory(frame)) + ); + } + + @Test + public void test_addSelection_withException() { String colName = "colName"; String errorMsg = "Frame writer exception";