Suggested memory calculation in case NOT_ENOUGH_MEMORY_FAULT is thrown. (#13846)

* Suggested memory calculation in case NOT_ENOUGH_MEMORY_FAULT is thrown.

Co-authored-by: Charles Smith <techdocsmith@gmail.com>
This commit is contained in:
Karan Kumar 2023-03-06 18:00:36 +05:30 committed by GitHub
parent 65c3954942
commit 94cfabea18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 155 additions and 50 deletions

View File

@ -690,7 +690,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_TooManyColumns">`TooManyColumns`</a> | Exceeded the maximum number of columns for a stage (2,000 columns). | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded. |
| <a name="error_TooManyWarnings">`TooManyWarnings`</a> | Exceeded the maximum allowed number of warnings of a particular type. | `rootErrorCode`: The error code corresponding to the exception that exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of warnings that are allowed for the corresponding `rootErrorCode`. |
| <a name="error_TooManyWorkers">`TooManyWorkers`</a> | Exceeded the maximum number of simultaneously-running workers. See the [Limits](#limits) table for more details. | `workers`: The number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage if multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard or soft limit on workers that was exceeded. If this is lower than the hard limit (1,000 workers), then you can increase the limit by adding more memory to each task. |
| <a name="error_NotEnoughMemory">`NotEnoughMemory`</a> | Insufficient memory to launch a stage. | `serverMemory`: The amount of memory available to a single process.<br /><br />`serverWorkers`: The number of workers running in a single process.<br /><br />`serverThreads`: The number of threads in a single process. |
| <a name="error_NotEnoughMemory">`NotEnoughMemory`</a> | Insufficient memory to launch a stage. | `suggestedServerMemory`: Suggested number of bytes of memory to allocate to a given process. <br /><br />`serverMemory`: The number of bytes of memory available to a single process.<br /><br />`usableMemory`: The number of usable bytes of memory for a single process.<br /><br />`serverWorkers`: The number of workers running in a single process.<br /><br />`serverThreads`: The number of threads in a single process. |
| <a name="error_WorkerFailed">`WorkerFailed`</a> | A worker task failed unexpectedly. | `errorMsg`<br /><br />`workerTaskId`: The ID of the worker task. |
| <a name="error_WorkerRpcFailed">`WorkerRpcFailed`</a> | A remote procedure call to a worker task failed and could not recover. | `workerTaskId`: the id of the worker task |
| <a name="error_UnknownError">`UnknownError`</a> | All other errors. | `message` |

View File

@ -19,6 +19,7 @@
package org.apache.druid.msq.exec;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.inject.Injector;
import it.unimi.dsi.fastutil.ints.IntSet;
@ -128,6 +129,10 @@ public class WorkerMemoryParameters
* we use a value somewhat lower than 0.5.
*/
static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
/**
* In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation overhead is added when estimating total memory required for the process.
*/
private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
private final int superSorterMaxActiveProcessors;
private final int superSorterMaxChannelsPerProcessor;
@ -155,12 +160,13 @@ public class WorkerMemoryParameters
*/
public static WorkerMemoryParameters createProductionInstanceForController(final Injector injector)
{
long totalLookupFootprint = computeTotalLookupFootprint(injector);
return createInstance(
Runtime.getRuntime().maxMemory(),
computeUsableMemoryInJvm(injector),
computeNumWorkersInJvm(injector),
computeNumProcessorsInJvm(injector),
0
0,
totalLookupFootprint
);
}
@ -179,13 +185,14 @@ public class WorkerMemoryParameters
inputStageNumbers.intStream()
.map(inputStageNumber -> queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
.sum();
long totalLookupFootprint = computeTotalLookupFootprint(injector);
return createInstance(
Runtime.getRuntime().maxMemory(),
computeUsableMemoryInJvm(injector),
computeNumWorkersInJvm(injector),
computeNumProcessorsInJvm(injector),
numInputWorkers
numInputWorkers,
totalLookupFootprint
);
}
@ -200,15 +207,30 @@ public class WorkerMemoryParameters
* the task capacity.
* @param numProcessingThreadsInJvm size of the processing thread pool in the JVM.
* @param numInputWorkers number of workers across input stages that need to be merged together.
* @param totalLookUpFootprint estimated size of the lookups loaded by the process.
*/
public static WorkerMemoryParameters createInstance(
final long maxMemoryInJvm,
final long usableMemoryInJvm,
final int numWorkersInJvm,
final int numProcessingThreadsInJvm,
final int numInputWorkers
final int numInputWorkers,
final long totalLookUpFootprint
)
{
Preconditions.checkArgument(maxMemoryInJvm > 0, "Max memory passed: [%s] should be > 0", maxMemoryInJvm);
Preconditions.checkArgument(numWorkersInJvm > 0, "Number of workers: [%s] in jvm should be > 0", numWorkersInJvm);
Preconditions.checkArgument(
numProcessingThreadsInJvm > 0,
"Number of processing threads [%s] should be > 0",
numProcessingThreadsInJvm
);
Preconditions.checkArgument(numInputWorkers >= 0, "Number of input workers: [%s] should be >=0", numInputWorkers);
Preconditions.checkArgument(
totalLookUpFootprint >= 0,
"Lookup memory footprint: [%s] should be >= 0",
totalLookUpFootprint
);
final long usableMemoryInJvm = computeUsableMemoryInJvm(maxMemoryInJvm, totalLookUpFootprint);
final long workerMemory = memoryPerWorker(usableMemoryInJvm, numWorkersInJvm);
final long bundleMemory = memoryPerBundle(usableMemoryInJvm, numWorkersInJvm, numProcessingThreadsInJvm);
final long bundleMemoryForInputChannels = memoryNeededForInputChannels(numInputWorkers);
@ -223,6 +245,12 @@ public class WorkerMemoryParameters
// Not enough memory for even one worker. More of a NotEnoughMemory situation than a TooManyWorkers situation.
throw new MSQException(
new NotEnoughMemoryFault(
calculateSuggestedMinMemoryFromUsableMemory(
estimateUsableMemory(
numWorkersInJvm,
numProcessingThreadsInJvm,
PROCESSING_MINIMUM_BYTES + BUFFER_BYTES_FOR_ESTIMATION + bundleMemoryForInputChannels
), totalLookUpFootprint),
maxMemoryInJvm,
usableMemoryInJvm,
numWorkersInJvm,
@ -238,6 +266,13 @@ public class WorkerMemoryParameters
if (maxNumFramesForSuperSorter < MIN_SUPER_SORTER_FRAMES) {
throw new MSQException(
new NotEnoughMemoryFault(
calculateSuggestedMinMemoryFromUsableMemory(
estimateUsableMemory(
numWorkersInJvm,
(MIN_SUPER_SORTER_FRAMES + BUFFER_BYTES_FOR_ESTIMATION) * LARGE_FRAME_SIZE
),
totalLookUpFootprint
),
maxMemoryInJvm,
usableMemoryInJvm,
numWorkersInJvm,
@ -412,7 +447,7 @@ public class WorkerMemoryParameters
}
/**
* Compute the memory allocated to each processing bundle.
* Compute the memory allocated to each processing bundle. Any computation changes done to this method should also be done in its corresponding method {@link WorkerMemoryParameters#estimateUsableMemory(int, int, long)}
*/
private static long memoryPerBundle(
final long usableMemoryInJvm,
@ -431,6 +466,32 @@ public class WorkerMemoryParameters
return memoryForBundles / bundleCount;
}
/**
* Used for estimating the usable memory for better exception messages when {@link NotEnoughMemoryFault} is thrown.
*/
private static long estimateUsableMemory(
final int numWorkersInJvm,
final int numProcessingThreadsInJvm,
final long estimatedEachBundleMemory
)
{
final int bundleCount = numWorkersInJvm + numProcessingThreadsInJvm;
return estimateUsableMemory(numWorkersInJvm, estimatedEachBundleMemory * bundleCount);
}
/**
* Add overheads to the estimated bundle memoery for all the workers. Checkout {@link WorkerMemoryParameters#memoryPerWorker(long, int)}
* for the overhead calculation outside the processing bundles.
*/
private static long estimateUsableMemory(final int numWorkersInJvm, final long estimatedTotalBundleMemory)
{
// Currently, we only add the partition stats overhead since it will be the single largest overhead per worker.
final long estimateStatOverHeadPerWorker = PARTITION_STATS_MEMORY_MAX_BYTES;
return estimatedTotalBundleMemory + (estimateStatOverHeadPerWorker * numWorkersInJvm);
}
private static long memoryNeededForInputChannels(final int numInputWorkers)
{
// Workers that read sorted inputs must open all channels at once to do an N-way merge. Calculate memory needs.
@ -439,11 +500,20 @@ public class WorkerMemoryParameters
}
/**
* Amount of heap memory available for our usage.
* Amount of heap memory available for our usage. Any computation changes done to this method should also be done in its corresponding method {@link WorkerMemoryParameters#calculateSuggestedMinMemoryFromUsableMemory}
*/
private static long computeUsableMemoryInJvm(final Injector injector)
private static long computeUsableMemoryInJvm(final long maxMemory, final long totalLookupFootprint)
{
return (long) ((Runtime.getRuntime().maxMemory() - computeTotalLookupFootprint(injector)) * USABLE_MEMORY_FRACTION);
// since lookups are essentially in memory hashmap's, the object overhead is trivial hence its subtracted prior to usable memory calculations.
return (long) ((maxMemory - totalLookupFootprint) * USABLE_MEMORY_FRACTION);
}
/**
* Estimate amount of heap memory for the given workload to use in case usable memory is provided. This method is used for better exception messages when {@link NotEnoughMemoryFault} is thrown.
*/
private static long calculateSuggestedMinMemoryFromUsableMemory(long usuableMemeory, final long totalLookupFootprint)
{
return (long) ((usuableMemeory / USABLE_MEMORY_FRACTION) + totalLookupFootprint);
}
/**

View File

@ -30,6 +30,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
{
static final String CODE = "NotEnoughMemory";
private final long suggestedServerMemory;
private final long serverMemory;
private final long usableMemory;
private final int serverWorkers;
@ -37,6 +38,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
@JsonCreator
public NotEnoughMemoryFault(
@JsonProperty("suggestedServerMemory") final long suggestedServerMemory,
@JsonProperty("serverMemory") final long serverMemory,
@JsonProperty("usableMemory") final long usableMemory,
@JsonProperty("serverWorkers") final int serverWorkers,
@ -45,19 +47,28 @@ public class NotEnoughMemoryFault extends BaseMSQFault
{
super(
CODE,
"Not enough memory (total = %,d; usable = %,d; server workers = %,d; server threads = %,d)",
"Not enough memory. Required al teast %,d bytes. (total = %,d bytes; usable = %,d bytes; server workers = %,d; server threads = %,d). Increase JVM memory with the -xmx option"
+ (serverWorkers > 1 ? " or reduce number of server workers" : ""),
suggestedServerMemory,
serverMemory,
usableMemory,
serverWorkers,
serverThreads
);
this.suggestedServerMemory = suggestedServerMemory;
this.serverMemory = serverMemory;
this.usableMemory = usableMemory;
this.serverWorkers = serverWorkers;
this.serverThreads = serverThreads;
}
@JsonProperty
public long getSuggestedServerMemory()
{
return suggestedServerMemory;
}
@JsonProperty
public long getServerMemory()
{
@ -95,25 +106,35 @@ public class NotEnoughMemoryFault extends BaseMSQFault
return false;
}
NotEnoughMemoryFault that = (NotEnoughMemoryFault) o;
return serverMemory == that.serverMemory
&& usableMemory == that.usableMemory
&& serverWorkers == that.serverWorkers
&& serverThreads == that.serverThreads;
return
suggestedServerMemory == that.suggestedServerMemory
&& serverMemory == that.serverMemory
&& usableMemory == that.usableMemory
&& serverWorkers == that.serverWorkers
&& serverThreads == that.serverThreads;
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), serverMemory, usableMemory, serverWorkers, serverThreads);
return Objects.hash(
super.hashCode(),
suggestedServerMemory,
serverMemory,
usableMemory,
serverWorkers,
serverThreads
);
}
@Override
public String toString()
{
return "NotEnoughMemoryFault{" +
"serverMemory=" + serverMemory +
", usableMemory=" + usableMemory +
", serverWorkers=" + serverWorkers +
"suggestedServerMemory=" + suggestedServerMemory +
" bytes, serverMemory=" + serverMemory +
" bytes, usableMemory=" + usableMemory +
" bytes, serverWorkers=" + serverWorkers +
", serverThreads=" + serverThreads +
'}';
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.msq.exec;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
import org.junit.Assert;
@ -31,29 +32,34 @@ public class WorkerMemoryParametersTest
@Test
public void test_oneWorkerInJvm_alone()
{
Assert.assertEquals(parameters(1, 41, 224_785_000, 100_650_000, 75_000_000), compute(1_000_000_000, 1, 1, 1));
Assert.assertEquals(parameters(2, 13, 149_410_000, 66_900_000, 75_000_000), compute(1_000_000_000, 1, 2, 1));
Assert.assertEquals(parameters(4, 3, 89_110_000, 39_900_000, 75_000_000), compute(1_000_000_000, 1, 4, 1));
Assert.assertEquals(parameters(3, 2, 48_910_000, 21_900_000, 75_000_000), compute(1_000_000_000, 1, 8, 1));
Assert.assertEquals(parameters(2, 2, 33_448_460, 14_976_922, 75_000_000), compute(1_000_000_000, 1, 12, 1));
Assert.assertEquals(parameters(1, 41, 224_785_000, 100_650_000, 75_000_000), compute(1_000_000_000, 1, 1, 1, 0));
Assert.assertEquals(parameters(2, 13, 149_410_000, 66_900_000, 75_000_000), compute(1_000_000_000, 1, 2, 1, 0));
Assert.assertEquals(parameters(4, 3, 89_110_000, 39_900_000, 75_000_000), compute(1_000_000_000, 1, 4, 1, 0));
Assert.assertEquals(parameters(3, 2, 48_910_000, 21_900_000, 75_000_000), compute(1_000_000_000, 1, 8, 1, 0));
Assert.assertEquals(parameters(2, 2, 33_448_460, 14_976_922, 75_000_000), compute(1_000_000_000, 1, 12, 1, 0));
final MSQException e = Assert.assertThrows(
MSQException.class,
() -> compute(1_000_000_000, 1, 32, 1)
() -> compute(1_000_000_000, 1, 32, 1, 0)
);
Assert.assertEquals(new NotEnoughMemoryFault(1_588_044_000, 1_000_000_000, 750_000_000, 1, 32), e.getFault());
final MSQFault fault = Assert.assertThrows(MSQException.class, () -> compute(1_000_000_000, 2, 32, 1, 0))
.getFault();
Assert.assertEquals(new NotEnoughMemoryFault(2024045333, 1_000_000_000, 750_000_000, 2, 32), fault);
Assert.assertEquals(new NotEnoughMemoryFault(1_000_000_000, 750_000_000, 1, 32), e.getFault());
}
@Test
public void test_oneWorkerInJvm_twoHundredWorkersInCluster()
{
Assert.assertEquals(parameters(1, 83, 317_580_000, 142_200_000, 150_000_000), compute(2_000_000_000, 1, 1, 200));
Assert.assertEquals(parameters(2, 27, 166_830_000, 74_700_000, 150_000_000), compute(2_000_000_000, 1, 2, 200));
Assert.assertEquals(parameters(1, 83, 317_580_000, 142_200_000, 150_000_000), compute(2_000_000_000, 1, 1, 200, 0));
Assert.assertEquals(parameters(2, 27, 166_830_000, 74_700_000, 150_000_000), compute(2_000_000_000, 1, 2, 200, 0));
final MSQException e = Assert.assertThrows(
MSQException.class,
() -> compute(1_000_000_000, 1, 4, 200)
() -> compute(1_000_000_000, 1, 4, 200, 0)
);
Assert.assertEquals(new TooManyWorkersFault(200, 109), e.getFault());
@ -62,32 +68,39 @@ public class WorkerMemoryParametersTest
@Test
public void test_fourWorkersInJvm_twoHundredWorkersInCluster()
{
Assert.assertEquals(parameters(1, 150, 679_380_000, 304_200_000, 168_750_000), compute(9_000_000_000L, 4, 1, 200));
Assert.assertEquals(parameters(2, 62, 543_705_000, 243_450_000, 168_750_000), compute(9_000_000_000L, 4, 2, 200));
Assert.assertEquals(parameters(4, 22, 374_111_250, 167_512_500, 168_750_000), compute(9_000_000_000L, 4, 4, 200));
Assert.assertEquals(parameters(4, 14, 204_517_500, 91_575_000, 168_750_000), compute(9_000_000_000L, 4, 8, 200));
Assert.assertEquals(parameters(4, 8, 68_842_500, 30_825_000, 168_750_000), compute(9_000_000_000L, 4, 16, 200));
Assert.assertEquals(
parameters(1, 150, 679_380_000, 304_200_000, 168_750_000),
compute(9_000_000_000L, 4, 1, 200, 0)
);
Assert.assertEquals(
parameters(2, 62, 543_705_000, 243_450_000, 168_750_000),
compute(9_000_000_000L, 4, 2, 200, 0)
);
Assert.assertEquals(
parameters(4, 22, 374_111_250, 167_512_500, 168_750_000),
compute(9_000_000_000L, 4, 4, 200, 0)
);
Assert.assertEquals(parameters(4, 14, 204_517_500, 91_575_000, 168_750_000), compute(9_000_000_000L, 4, 8, 200, 0));
Assert.assertEquals(parameters(4, 8, 68_842_500, 30_825_000, 168_750_000), compute(9_000_000_000L, 4, 16, 200, 0));
final MSQException e = Assert.assertThrows(
MSQException.class,
() -> compute(8_000_000_000L, 4, 32, 200)
() -> compute(8_000_000_000L, 4, 32, 200, 0)
);
Assert.assertEquals(new TooManyWorkersFault(200, 124), e.getFault());
// Make sure 107 actually works. (Verify the error message above.)
Assert.assertEquals(parameters(4, 3, 28_140_000, 12_600_000, 150_000_000), compute(8_000_000_000L, 4, 32, 107));
Assert.assertEquals(parameters(4, 3, 28_140_000, 12_600_000, 150_000_000), compute(8_000_000_000L, 4, 32, 107, 0));
}
@Test
public void test_oneWorkerInJvm_negativeUsableMemory()
{
final MSQException e = Assert.assertThrows(
MSQException.class,
Exception e = Assert.assertThrows(
IllegalArgumentException.class,
() -> WorkerMemoryParameters.createInstance(100, -50, 1, 32, 1)
);
Assert.assertEquals(new NotEnoughMemoryFault(100, -50, 1, 32), e.getFault());
}
@Test
@ -117,15 +130,16 @@ public class WorkerMemoryParametersTest
final long maxMemoryInJvm,
final int numWorkersInJvm,
final int numProcessingThreadsInJvm,
final int numInputWorkers
final int numInputWorkers,
final int totalLookUpFootprint
)
{
return WorkerMemoryParameters.createInstance(
maxMemoryInJvm,
(long) (maxMemoryInJvm * WorkerMemoryParameters.USABLE_MEMORY_FRACTION),
numWorkersInJvm,
numProcessingThreadsInJvm,
numInputWorkers
numInputWorkers,
totalLookUpFootprint
);
}
}

View File

@ -60,7 +60,7 @@ public class MSQFaultSerdeTest
assertFaultSerde(InsertTimeNullFault.INSTANCE);
assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY));
assertFaultSerde(new InvalidNullByteFault("the column"));
assertFaultSerde(new NotEnoughMemoryFault(1000, 900, 1, 2));
assertFaultSerde(new NotEnoughMemoryFault(1000, 1000, 900, 1, 2));
assertFaultSerde(QueryNotSupportedFault.INSTANCE);
assertFaultSerde(new RowTooLargeFault(1000));
assertFaultSerde(new TaskStartTimeoutFault(10));

View File

@ -72,11 +72,11 @@ public class CalciteSelectQueryTestMSQ extends CalciteQueryTest
{
final WorkerMemoryParameters workerMemoryParameters =
WorkerMemoryParameters.createInstance(
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
2,
10,
2
2,
0
);
indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,

View File

@ -282,11 +282,11 @@ public class MSQTestBase extends BaseCalciteQueryTest
private TestGroupByBuffers groupByBuffers;
protected final WorkerMemoryParameters workerMemoryParameters = Mockito.spy(
WorkerMemoryParameters.createInstance(
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
2,
10,
2
2,
0
)
);

View File

@ -207,7 +207,7 @@ public abstract class QueryResultPusher
resultsWriter.recordFailure(e);
// This case is always a failure because the error happened mid-stream of sending results back. Therefore,
// we do not believe that the response stream was actually useable
// we do not believe that the response stream was actually usable
counter.incrementFailed();
return null;
}