diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java index effca1b06f7..aa258859b50 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java @@ -1805,7 +1805,7 @@ public class MSQWindowTest extends MSQTestBase .setSql( "select cityName, added, SUM(added) OVER () cc from wikipedia") .setQueryContext(customContext) - .setExpectedMSQFault(new TooManyRowsInAWindowFault(15922, 200)) + .setExpectedMSQFault(new TooManyRowsInAWindowFault(15930, 200)) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index cbaae8f1b8e..62496abacf6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -76,6 +76,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.msq.counters.CounterNames; @@ -159,6 +160,7 @@ import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.sql.DirectStatement; @@ -587,6 +589,8 @@ public class MSQTestBase extends BaseCalciteQueryTest sqlStatementFactory = CalciteTests.createSqlStatementFactory(engine, plannerFactory); authorizerMapper = CalciteTests.TEST_EXTERNAL_AUTHORIZER_MAPPER; + + EmittingLogger.registerEmitter(new NoopServiceEmitter()); } protected CatalogResolver createMockCatalogResolver() diff --git a/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java b/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java index 8d961f298ce..e66871186c0 100644 --- a/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java +++ b/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java @@ -158,12 +158,17 @@ public class AppendableMemory implements Closeable releaseLastBlockIfEmpty(); - if (bytes > allocator.available()) { - return false; - } - final int idx = currentBlockNumber(); + // The request cannot be satisfied by the available bytes in the allocator + if (bytes > allocator.available()) { + // Check if the last allocated block has enough memory to satisfy the request + if (idx < 0 || bytes + limits.getInt(idx) > blockHolders.get(idx).get().getCapacity()) { + // The request cannot be satisfied by the allocator and the last allocated block. Return false + return false; + } + } + if (idx < 0 || bytes + limits.getInt(idx) > blockHolders.get(idx).get().getCapacity()) { // Allocation needed. // Math.max(allocationSize, bytes) in case "bytes" is greater than SOFT_MAXIMUM_ALLOCATION_SIZE. diff --git a/processing/src/test/java/org/apache/druid/frame/allocation/AppendableMemoryTest.java b/processing/src/test/java/org/apache/druid/frame/allocation/AppendableMemoryTest.java new file mode 100644 index 00000000000..eca9f135c87 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/allocation/AppendableMemoryTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.allocation; + +import org.junit.Assert; +import org.junit.Test; + +public class AppendableMemoryTest +{ + + @Test + public void testReserveAdditionalWithLargeLastBlockAndSmallAllocator() + { + /* + This tests the edge case when the last chunk of memory allocated by the allocator has greater available free space + than what can be allocated by the allocator. In that case, the availableToReserve call should return the free memory from + the last allocated block, and reserveAdditional when called with that value should return true (and not do any additional + allocation). This test case assumes a lot about the implementation of the AppendableMemory, but a lot of the assertions made + in this test are logical, and should hold true. The final assertion is the most important which checks that the free space + in the last block takes precedence over the memory allocator, which should hold true irrespective of the implementation + */ + + // Allocator that can allocate atmost 100 bytes and AppendableMemory created with that allocator + MemoryAllocator memoryAllocator = new HeapMemoryAllocator(100); + AppendableMemory appendableMemory = AppendableMemory.create(memoryAllocator, 10); + + // Reserves a chunk of 10 bytes. The call should return true since the allocator can allocate 100 bytes + Assert.assertTrue(appendableMemory.reserveAdditional(10)); + + // Last block is empty, the appendable memory is essentially empty + Assert.assertEquals(100, appendableMemory.availableToReserve()); + + // Advance the cursor so that it is not treated as empty chunk + appendableMemory.advanceCursor(4); + + // We should be able to use the remaining 90 bytes from the allocator + Assert.assertEquals(90, appendableMemory.availableToReserve()); + + // Reserve a chunk of 80 bytes, and advance the cursor so that it is not treated as an empty chunk + Assert.assertTrue(appendableMemory.reserveAdditional(80)); + appendableMemory.advanceCursor(4); + + // At this point, we have 2 chunks with the following (used:free) / total statistics + // chunk0 - (4:6)/10 + // chunk1 - (4:76)/80 + // The allocator still has 10 additional bytes to reserve + + // Even though the allocator has only 10 bytes, the last chunk has 76 free bytes which can be used. That would take precedence + // since that is a larger number + Assert.assertEquals(76, appendableMemory.availableToReserve()); + + // This assertion must always be true irrespective of the internal implementation + Assert.assertTrue(appendableMemory.reserveAdditional(appendableMemory.availableToReserve())); + } + +} diff --git a/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java b/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java index 3103348284b..073b83dbe22 100644 --- a/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java +++ b/processing/src/test/java/org/apache/druid/frame/write/FrameWriterTest.java @@ -344,7 +344,7 @@ public class FrameWriterTest extends InitializedNullHandlingTest for (final FrameWriterTestData.Dataset dataset1 : FrameWriterTestData.DATASETS) { for (final FrameWriterTestData.Dataset dataset2 : FrameWriterTestData.DATASETS) { final RowSignature signature = makeSignature(Arrays.asList(dataset1, dataset2)); - final Sequence> rowSequence = unsortAndMakeRows(Arrays.asList(dataset1, dataset2)); + final Sequence> rowSequence = unsortAndMakeRows(Arrays.asList(dataset1, dataset2), 1); final List sortColumns = new ArrayList<>(); sortColumns.add(signature.getColumnName(0)); @@ -378,7 +378,7 @@ public class FrameWriterTest extends InitializedNullHandlingTest // Test every possible capacity, up to the amount required to write all items from every list. Assume.assumeFalse(inputFrameType == FrameType.COLUMNAR || outputFrameType == FrameType.COLUMNAR); final RowSignature signature = makeSignature(FrameWriterTestData.DATASETS); - final Sequence> rowSequence = unsortAndMakeRows(FrameWriterTestData.DATASETS); + final Sequence> rowSequence = unsortAndMakeRows(FrameWriterTestData.DATASETS, 3); final int totalRows = rowSequence.toList().size(); final List sortColumns = new ArrayList<>(); @@ -648,7 +648,10 @@ public class FrameWriterTest extends InitializedNullHandlingTest /** * Create rows out of shuffled (unsorted) datasets. */ - private static Sequence> unsortAndMakeRows(final List> datasets) + private static Sequence> unsortAndMakeRows( + final List> datasets, + final int multiplicationFactor + ) { final List> retVal = new ArrayList<>(); @@ -672,7 +675,12 @@ public class FrameWriterTest extends InitializedNullHandlingTest retVal.add(row); } - return Sequences.simple(retVal); + List> multipliedRetVal = new ArrayList<>(); + for (int i = 0; i < multiplicationFactor; ++i) { + multipliedRetVal.addAll(retVal); + } + + return Sequences.simple(multipliedRetVal); } /**