mirror of https://github.com/apache/druid.git
The "suggested server memory" figure needs to take into account maxConcurrentStages. The fix here does not affect the main memory calculations, but it does affect the accuracy of error messages. Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
This commit is contained in:
parent
4a8008aebb
commit
cf44747c4d
|
@ -215,7 +215,7 @@ public class WorkerMemoryParameters
|
|||
|
||||
final long minimumBundleFreeMemory = computeMinimumBundleFreeMemory(frameSize, numFramesPerOutputChannel);
|
||||
if (bundleFreeMemory < minimumBundleFreeMemory) {
|
||||
final long requiredTaskMemory = bundleMemory - bundleFreeMemory + minimumBundleFreeMemory;
|
||||
final long requiredTaskMemory = (bundleMemory - bundleFreeMemory + minimumBundleFreeMemory) * maxConcurrentStages;
|
||||
throw new MSQException(
|
||||
new NotEnoughMemoryFault(
|
||||
memoryIntrospector.computeJvmMemoryRequiredForTaskMemory(requiredTaskMemory),
|
||||
|
|
|
@ -281,6 +281,55 @@ public class WorkerMemoryParametersTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_1WorkerInJvm_alone_40Threads_2ConcurrentStages()
|
||||
{
|
||||
final int numThreads = 40;
|
||||
final int frameSize = WorkerMemoryParameters.DEFAULT_FRAME_SIZE;
|
||||
|
||||
final MemoryIntrospectorImpl memoryIntrospector = createMemoryIntrospector(1_250_000_000, 1, numThreads);
|
||||
final List<InputSlice> slices = makeInputSlices(ReadablePartitions.striped(0, 1, numThreads));
|
||||
final IntSet broadcastInputs = IntSets.emptySet();
|
||||
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();
|
||||
|
||||
final MSQException e = Assert.assertThrows(
|
||||
MSQException.class,
|
||||
() -> WorkerMemoryParameters.createInstance(
|
||||
memoryIntrospector,
|
||||
frameSize,
|
||||
slices,
|
||||
broadcastInputs,
|
||||
shuffleSpec,
|
||||
2,
|
||||
1
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
new NotEnoughMemoryFault(2_732_500_000L, 1_250_000_000, 1_000_000_000, 1, 40, 1, 2),
|
||||
e.getFault()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_1WorkerInJvm_alone_40Threads_2ConcurrentStages_memoryFromError()
|
||||
{
|
||||
// Test with the amount of memory recommended in the error message from
|
||||
// test_1WorkerInJvm_alone_40Threads_2ConcurrentStages.
|
||||
final int numThreads = 40;
|
||||
final int frameSize = WorkerMemoryParameters.DEFAULT_FRAME_SIZE;
|
||||
|
||||
final MemoryIntrospectorImpl memoryIntrospector = createMemoryIntrospector(2_732_500_000L, 1, numThreads);
|
||||
final List<InputSlice> slices = makeInputSlices(ReadablePartitions.striped(0, 1, numThreads));
|
||||
final IntSet broadcastInputs = IntSets.emptySet();
|
||||
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();
|
||||
|
||||
Assert.assertEquals(
|
||||
new WorkerMemoryParameters(13_000_000, frameSize, 1, 2, 250_000, 0),
|
||||
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_1WorkerInJvm_200WorkersInCluster_4Threads()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue