mirror of https://github.com/apache/druid.git
MSQ: Fix calculation of suggested memory in WorkerMemoryParameters. (#17108)
The "suggested server memory" figure needs to take into account maxConcurrentStages. The fix here does not affect the main memory calculations, but it does affect the accuracy of error messages.
This commit is contained in:
parent
39723e5401
commit
ca0cb64ee8
|
@ -215,7 +215,7 @@ public class WorkerMemoryParameters
|
|||
|
||||
final long minimumBundleFreeMemory = computeMinimumBundleFreeMemory(frameSize, numFramesPerOutputChannel);
|
||||
if (bundleFreeMemory < minimumBundleFreeMemory) {
|
||||
final long requiredTaskMemory = bundleMemory - bundleFreeMemory + minimumBundleFreeMemory;
|
||||
final long requiredTaskMemory = (bundleMemory - bundleFreeMemory + minimumBundleFreeMemory) * maxConcurrentStages;
|
||||
throw new MSQException(
|
||||
new NotEnoughMemoryFault(
|
||||
memoryIntrospector.computeJvmMemoryRequiredForTaskMemory(requiredTaskMemory),
|
||||
|
|
|
@ -281,6 +281,55 @@ public class WorkerMemoryParametersTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_1WorkerInJvm_alone_40Threads_2ConcurrentStages()
|
||||
{
|
||||
final int numThreads = 40;
|
||||
final int frameSize = WorkerMemoryParameters.DEFAULT_FRAME_SIZE;
|
||||
|
||||
final MemoryIntrospectorImpl memoryIntrospector = createMemoryIntrospector(1_250_000_000, 1, numThreads);
|
||||
final List<InputSlice> slices = makeInputSlices(ReadablePartitions.striped(0, 1, numThreads));
|
||||
final IntSet broadcastInputs = IntSets.emptySet();
|
||||
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();
|
||||
|
||||
final MSQException e = Assert.assertThrows(
|
||||
MSQException.class,
|
||||
() -> WorkerMemoryParameters.createInstance(
|
||||
memoryIntrospector,
|
||||
frameSize,
|
||||
slices,
|
||||
broadcastInputs,
|
||||
shuffleSpec,
|
||||
2,
|
||||
1
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
new NotEnoughMemoryFault(2_732_500_000L, 1_250_000_000, 1_000_000_000, 1, 40, 1, 2),
|
||||
e.getFault()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_1WorkerInJvm_alone_40Threads_2ConcurrentStages_memoryFromError()
|
||||
{
|
||||
// Test with the amount of memory recommended in the error message from
|
||||
// test_1WorkerInJvm_alone_40Threads_2ConcurrentStages.
|
||||
final int numThreads = 40;
|
||||
final int frameSize = WorkerMemoryParameters.DEFAULT_FRAME_SIZE;
|
||||
|
||||
final MemoryIntrospectorImpl memoryIntrospector = createMemoryIntrospector(2_732_500_000L, 1, numThreads);
|
||||
final List<InputSlice> slices = makeInputSlices(ReadablePartitions.striped(0, 1, numThreads));
|
||||
final IntSet broadcastInputs = IntSets.emptySet();
|
||||
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();
|
||||
|
||||
Assert.assertEquals(
|
||||
new WorkerMemoryParameters(13_000_000, frameSize, 1, 2, 250_000, 0),
|
||||
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_1WorkerInJvm_200WorkersInCluster_4Threads()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue