mirror of https://github.com/apache/druid.git
Fixes an issue with AppendableMemory that can cause MSQ jobs to fail (#17369)
This commit is contained in:
parent
dceff89103
commit
5b09329479
|
@ -1805,7 +1805,7 @@ public class MSQWindowTest extends MSQTestBase
|
||||||
.setSql(
|
.setSql(
|
||||||
"select cityName, added, SUM(added) OVER () cc from wikipedia")
|
"select cityName, added, SUM(added) OVER () cc from wikipedia")
|
||||||
.setQueryContext(customContext)
|
.setQueryContext(customContext)
|
||||||
.setExpectedMSQFault(new TooManyRowsInAWindowFault(15922, 200))
|
.setExpectedMSQFault(new TooManyRowsInAWindowFault(15930, 200))
|
||||||
.verifyResults();
|
.verifyResults();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
|
||||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||||
import org.apache.druid.java.util.common.io.Closer;
|
import org.apache.druid.java.util.common.io.Closer;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||||
import org.apache.druid.java.util.http.client.Request;
|
import org.apache.druid.java.util.http.client.Request;
|
||||||
import org.apache.druid.metadata.input.InputSourceModule;
|
import org.apache.druid.metadata.input.InputSourceModule;
|
||||||
import org.apache.druid.msq.counters.CounterNames;
|
import org.apache.druid.msq.counters.CounterNames;
|
||||||
|
@ -159,6 +160,7 @@ import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
|
||||||
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
|
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
|
||||||
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
|
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
|
||||||
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
|
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
|
||||||
|
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||||
import org.apache.druid.server.security.AuthConfig;
|
import org.apache.druid.server.security.AuthConfig;
|
||||||
import org.apache.druid.server.security.AuthorizerMapper;
|
import org.apache.druid.server.security.AuthorizerMapper;
|
||||||
import org.apache.druid.sql.DirectStatement;
|
import org.apache.druid.sql.DirectStatement;
|
||||||
|
@ -587,6 +589,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
|
||||||
sqlStatementFactory = CalciteTests.createSqlStatementFactory(engine, plannerFactory);
|
sqlStatementFactory = CalciteTests.createSqlStatementFactory(engine, plannerFactory);
|
||||||
|
|
||||||
authorizerMapper = CalciteTests.TEST_EXTERNAL_AUTHORIZER_MAPPER;
|
authorizerMapper = CalciteTests.TEST_EXTERNAL_AUTHORIZER_MAPPER;
|
||||||
|
|
||||||
|
EmittingLogger.registerEmitter(new NoopServiceEmitter());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected CatalogResolver createMockCatalogResolver()
|
protected CatalogResolver createMockCatalogResolver()
|
||||||
|
|
|
@ -158,11 +158,16 @@ public class AppendableMemory implements Closeable
|
||||||
|
|
||||||
releaseLastBlockIfEmpty();
|
releaseLastBlockIfEmpty();
|
||||||
|
|
||||||
|
final int idx = currentBlockNumber();
|
||||||
|
|
||||||
|
// The request cannot be satisfied by the available bytes in the allocator
|
||||||
if (bytes > allocator.available()) {
|
if (bytes > allocator.available()) {
|
||||||
|
// Check if the last allocated block has enough memory to satisfy the request
|
||||||
|
if (idx < 0 || bytes + limits.getInt(idx) > blockHolders.get(idx).get().getCapacity()) {
|
||||||
|
// The request cannot be satisfied by the allocator and the last allocated block. Return false
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
final int idx = currentBlockNumber();
|
|
||||||
|
|
||||||
if (idx < 0 || bytes + limits.getInt(idx) > blockHolders.get(idx).get().getCapacity()) {
|
if (idx < 0 || bytes + limits.getInt(idx) > blockHolders.get(idx).get().getCapacity()) {
|
||||||
// Allocation needed.
|
// Allocation needed.
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.frame.allocation;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class AppendableMemoryTest
|
||||||
|
{
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReserveAdditionalWithLargeLastBlockAndSmallAllocator()
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
This tests the edge case when the last chunk of memory allocated by the allocator has greater available free space
|
||||||
|
than what can be allocated by the allocator. In that case, the availableToReserve call should return the free memory from
|
||||||
|
the last allocated block, and reserveAdditional when called with that value should return true (and not do any additional
|
||||||
|
allocation). This test case assumes a lot about the implementation of the AppendableMemory, but a lot of the assertions made
|
||||||
|
in this test are logical, and should hold true. The final assertion is the most important which checks that the free space
|
||||||
|
in the last block takes precedence over the memory allocator, which should hold true irrespective of the implementation
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Allocator that can allocate atmost 100 bytes and AppendableMemory created with that allocator
|
||||||
|
MemoryAllocator memoryAllocator = new HeapMemoryAllocator(100);
|
||||||
|
AppendableMemory appendableMemory = AppendableMemory.create(memoryAllocator, 10);
|
||||||
|
|
||||||
|
// Reserves a chunk of 10 bytes. The call should return true since the allocator can allocate 100 bytes
|
||||||
|
Assert.assertTrue(appendableMemory.reserveAdditional(10));
|
||||||
|
|
||||||
|
// Last block is empty, the appendable memory is essentially empty
|
||||||
|
Assert.assertEquals(100, appendableMemory.availableToReserve());
|
||||||
|
|
||||||
|
// Advance the cursor so that it is not treated as empty chunk
|
||||||
|
appendableMemory.advanceCursor(4);
|
||||||
|
|
||||||
|
// We should be able to use the remaining 90 bytes from the allocator
|
||||||
|
Assert.assertEquals(90, appendableMemory.availableToReserve());
|
||||||
|
|
||||||
|
// Reserve a chunk of 80 bytes, and advance the cursor so that it is not treated as an empty chunk
|
||||||
|
Assert.assertTrue(appendableMemory.reserveAdditional(80));
|
||||||
|
appendableMemory.advanceCursor(4);
|
||||||
|
|
||||||
|
// At this point, we have 2 chunks with the following (used:free) / total statistics
|
||||||
|
// chunk0 - (4:6)/10
|
||||||
|
// chunk1 - (4:76)/80
|
||||||
|
// The allocator still has 10 additional bytes to reserve
|
||||||
|
|
||||||
|
// Even though the allocator has only 10 bytes, the last chunk has 76 free bytes which can be used. That would take precedence
|
||||||
|
// since that is a larger number
|
||||||
|
Assert.assertEquals(76, appendableMemory.availableToReserve());
|
||||||
|
|
||||||
|
// This assertion must always be true irrespective of the internal implementation
|
||||||
|
Assert.assertTrue(appendableMemory.reserveAdditional(appendableMemory.availableToReserve()));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -344,7 +344,7 @@ public class FrameWriterTest extends InitializedNullHandlingTest
|
||||||
for (final FrameWriterTestData.Dataset<?> dataset1 : FrameWriterTestData.DATASETS) {
|
for (final FrameWriterTestData.Dataset<?> dataset1 : FrameWriterTestData.DATASETS) {
|
||||||
for (final FrameWriterTestData.Dataset<?> dataset2 : FrameWriterTestData.DATASETS) {
|
for (final FrameWriterTestData.Dataset<?> dataset2 : FrameWriterTestData.DATASETS) {
|
||||||
final RowSignature signature = makeSignature(Arrays.asList(dataset1, dataset2));
|
final RowSignature signature = makeSignature(Arrays.asList(dataset1, dataset2));
|
||||||
final Sequence<List<Object>> rowSequence = unsortAndMakeRows(Arrays.asList(dataset1, dataset2));
|
final Sequence<List<Object>> rowSequence = unsortAndMakeRows(Arrays.asList(dataset1, dataset2), 1);
|
||||||
|
|
||||||
final List<String> sortColumns = new ArrayList<>();
|
final List<String> sortColumns = new ArrayList<>();
|
||||||
sortColumns.add(signature.getColumnName(0));
|
sortColumns.add(signature.getColumnName(0));
|
||||||
|
@ -378,7 +378,7 @@ public class FrameWriterTest extends InitializedNullHandlingTest
|
||||||
// Test every possible capacity, up to the amount required to write all items from every list.
|
// Test every possible capacity, up to the amount required to write all items from every list.
|
||||||
Assume.assumeFalse(inputFrameType == FrameType.COLUMNAR || outputFrameType == FrameType.COLUMNAR);
|
Assume.assumeFalse(inputFrameType == FrameType.COLUMNAR || outputFrameType == FrameType.COLUMNAR);
|
||||||
final RowSignature signature = makeSignature(FrameWriterTestData.DATASETS);
|
final RowSignature signature = makeSignature(FrameWriterTestData.DATASETS);
|
||||||
final Sequence<List<Object>> rowSequence = unsortAndMakeRows(FrameWriterTestData.DATASETS);
|
final Sequence<List<Object>> rowSequence = unsortAndMakeRows(FrameWriterTestData.DATASETS, 3);
|
||||||
final int totalRows = rowSequence.toList().size();
|
final int totalRows = rowSequence.toList().size();
|
||||||
|
|
||||||
final List<String> sortColumns = new ArrayList<>();
|
final List<String> sortColumns = new ArrayList<>();
|
||||||
|
@ -648,7 +648,10 @@ public class FrameWriterTest extends InitializedNullHandlingTest
|
||||||
/**
|
/**
|
||||||
* Create rows out of shuffled (unsorted) datasets.
|
* Create rows out of shuffled (unsorted) datasets.
|
||||||
*/
|
*/
|
||||||
private static Sequence<List<Object>> unsortAndMakeRows(final List<FrameWriterTestData.Dataset<?>> datasets)
|
private static Sequence<List<Object>> unsortAndMakeRows(
|
||||||
|
final List<FrameWriterTestData.Dataset<?>> datasets,
|
||||||
|
final int multiplicationFactor
|
||||||
|
)
|
||||||
{
|
{
|
||||||
final List<List<Object>> retVal = new ArrayList<>();
|
final List<List<Object>> retVal = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -672,7 +675,12 @@ public class FrameWriterTest extends InitializedNullHandlingTest
|
||||||
retVal.add(row);
|
retVal.add(row);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Sequences.simple(retVal);
|
List<List<Object>> multipliedRetVal = new ArrayList<>();
|
||||||
|
for (int i = 0; i < multiplicationFactor; ++i) {
|
||||||
|
multipliedRetVal.addAll(retVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Sequences.simple(multipliedRetVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue