Add maxInputBytesPerWorker as query context parameter (#13707)

* Add maxInputBytesPerWorker as query context parameter

* Move documenation to msq specific docs

* Update tests

* Spacing

* Address review comments

* Fix test

* Update docs/multi-stage-query/reference.md

* Correct spelling mistake

---------

Co-authored-by: Karan Kumar <karankumar1100@gmail.com>
This commit is contained in:
Adarsh Sanjeev 2023-01-31 20:55:28 +05:30 committed by GitHub
parent 698670c88e
commit 51dfde0284
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 62 additions and 10 deletions

View File

@ -602,6 +602,7 @@ The following table lists the context parameters for the MSQ task engine:
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` | | `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `composedIntermediateSuperSorterStorageEnabled` | SELECT, INSERT, REPLACE<br /><br /> Whether to enable automatic fallback to durable storage from local storage for sorting's intermediate data. Requires to setup `intermediateSuperSorterStorageMaxLocalBytes` limit for local storage and durable shuffle storage feature as well.| `false` | | `composedIntermediateSuperSorterStorageEnabled` | SELECT, INSERT, REPLACE<br /><br /> Whether to enable automatic fallback to durable storage from local storage for sorting's intermediate data. Requires to setup `intermediateSuperSorterStorageMaxLocalBytes` limit for local storage and durable shuffle storage feature as well.| `false` |
| `intermediateSuperSorterStorageMaxLocalBytes` | SELECT, INSERT, REPLACE<br /><br /> Whether to enable a byte limit on local storage for sorting's intermediate data. If that limit is crossed, the task fails with `ResourceLimitExceededException`.| `9223372036854775807` | | `intermediateSuperSorterStorageMaxLocalBytes` | SELECT, INSERT, REPLACE<br /><br /> Whether to enable a byte limit on local storage for sorting's intermediate data. If that limit is crossed, the task fails with `ResourceLimitExceededException`.| `9223372036854775807` |
| `maxInputBytesPerWorker` | Should be used in conjunction with taskAssignment `auto` mode. When dividing the input of a stage among the workers, this parameter determines the maximum size in bytes that are given to a single worker before the next worker is chosen. This parameter is only used as a guideline during input slicing, and does not guarantee that a the input cannot be larger. For example, we have 3 files. 3, 7, 12 GB each. then we would end up using 2 worker: worker 1 -> 3, 7 and worker 2 -> 12. This value is used for all stages in a query. | `10737418240` |
## Sketch Merging Mode ## Sketch Merging Mode
This section details the advantages and performance of various Cluster By Statistics Merge Modes. This section details the advantages and performance of various Cluster By Statistics Merge Modes.

View File

@ -1599,10 +1599,14 @@ public class ControllerImpl implements Controller
final DataSchema dataSchema = final DataSchema dataSchema =
generateDataSchema(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper); generateDataSchema(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper);
final long maxInputBytesPerWorker =
MultiStageQueryContext.getMaxInputBytesPerWorker(querySpec.getQuery().context());
builder.add( builder.add(
StageDefinition.builder(queryDef.getNextStageNumber()) StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers()) .maxWorkerCount(tuningConfig.getMaxNumWorkers())
.maxInputBytesPerWorker(maxInputBytesPerWorker)
.processorFactory( .processorFactory(
new SegmentGeneratorFrameProcessorFactory( new SegmentGeneratorFrameProcessorFactory(
dataSchema, dataSchema,

View File

@ -62,7 +62,7 @@ public class Limits
/** /**
* Maximum number of input bytes per worker in case number of tasks is determined automatically. * Maximum number of input bytes per worker in case number of tasks is determined automatically.
*/ */
public static final long MAX_INPUT_BYTES_PER_WORKER = 10 * 1024 * 1024 * 1024L; public static final long DEFAULT_MAX_INPUT_BYTES_PER_WORKER = 10 * 1024 * 1024 * 1024L;
/** /**
* Maximum size of the kernel manipulation queue in {@link org.apache.druid.msq.indexing.MSQControllerTask}. * Maximum size of the kernel manipulation queue in {@link org.apache.druid.msq.indexing.MSQControllerTask}.

View File

@ -33,6 +33,7 @@ import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecs; import org.apache.druid.msq.input.InputSpecs;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
@ -85,6 +86,7 @@ public class StageDefinition
private final FrameProcessorFactory processorFactory; private final FrameProcessorFactory processorFactory;
private final RowSignature signature; private final RowSignature signature;
private final int maxWorkerCount; private final int maxWorkerCount;
private final long maxInputBytesPerWorker;
private final boolean shuffleCheckHasMultipleValues; private final boolean shuffleCheckHasMultipleValues;
@Nullable @Nullable
@ -102,7 +104,8 @@ public class StageDefinition
@JsonProperty("signature") final RowSignature signature, @JsonProperty("signature") final RowSignature signature,
@Nullable @JsonProperty("shuffleSpec") final ShuffleSpec shuffleSpec, @Nullable @JsonProperty("shuffleSpec") final ShuffleSpec shuffleSpec,
@JsonProperty("maxWorkerCount") final int maxWorkerCount, @JsonProperty("maxWorkerCount") final int maxWorkerCount,
@JsonProperty("shuffleCheckHasMultipleValues") final boolean shuffleCheckHasMultipleValues @JsonProperty("shuffleCheckHasMultipleValues") final boolean shuffleCheckHasMultipleValues,
@JsonProperty("maxInputBytesPerWorker") final Long maxInputBytesPerWorker
) )
{ {
this.id = Preconditions.checkNotNull(id, "id"); this.id = Preconditions.checkNotNull(id, "id");
@ -122,6 +125,8 @@ public class StageDefinition
this.maxWorkerCount = maxWorkerCount; this.maxWorkerCount = maxWorkerCount;
this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues; this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues;
this.frameReader = Suppliers.memoize(() -> FrameReader.create(signature))::get; this.frameReader = Suppliers.memoize(() -> FrameReader.create(signature))::get;
this.maxInputBytesPerWorker = maxInputBytesPerWorker == null ?
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER : maxInputBytesPerWorker;
if (shuffleSpec != null && shuffleSpec.needsStatistics() && shuffleSpec.getClusterBy().getColumns().isEmpty()) { if (shuffleSpec != null && shuffleSpec.needsStatistics() && shuffleSpec.getClusterBy().getColumns().isEmpty()) {
throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy", shuffleSpec); throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy", shuffleSpec);
@ -241,6 +246,12 @@ public class StageDefinition
return maxWorkerCount; return maxWorkerCount;
} }
@JsonProperty
public long getMaxInputBytesPerWorker()
{
return maxInputBytesPerWorker;
}
@JsonProperty("shuffleCheckHasMultipleValues") @JsonProperty("shuffleCheckHasMultipleValues")
@JsonInclude(JsonInclude.Include.NON_DEFAULT) @JsonInclude(JsonInclude.Include.NON_DEFAULT)
boolean getShuffleCheckHasMultipleValues() boolean getShuffleCheckHasMultipleValues()
@ -330,7 +341,8 @@ public class StageDefinition
&& Objects.equals(broadcastInputNumbers, that.broadcastInputNumbers) && Objects.equals(broadcastInputNumbers, that.broadcastInputNumbers)
&& Objects.equals(processorFactory, that.processorFactory) && Objects.equals(processorFactory, that.processorFactory)
&& Objects.equals(signature, that.signature) && Objects.equals(signature, that.signature)
&& Objects.equals(shuffleSpec, that.shuffleSpec); && Objects.equals(shuffleSpec, that.shuffleSpec)
&& Objects.equals(maxInputBytesPerWorker, that.maxInputBytesPerWorker);
} }
@Override @Override
@ -344,7 +356,8 @@ public class StageDefinition
signature, signature,
maxWorkerCount, maxWorkerCount,
shuffleCheckHasMultipleValues, shuffleCheckHasMultipleValues,
shuffleSpec shuffleSpec,
maxInputBytesPerWorker
); );
} }
@ -360,6 +373,7 @@ public class StageDefinition
", maxWorkerCount=" + maxWorkerCount + ", maxWorkerCount=" + maxWorkerCount +
", shuffleSpec=" + shuffleSpec + ", shuffleSpec=" + shuffleSpec +
(shuffleCheckHasMultipleValues ? ", shuffleCheckHasMultipleValues=" + shuffleCheckHasMultipleValues : "") + (shuffleCheckHasMultipleValues ? ", shuffleCheckHasMultipleValues=" + shuffleCheckHasMultipleValues : "") +
", maxInputBytesPerWorker=" + maxInputBytesPerWorker +
'}'; '}';
} }
} }

View File

@ -21,6 +21,7 @@ package org.apache.druid.msq.kernel;
import it.unimi.dsi.fastutil.ints.IntRBTreeSet; import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet; import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
@ -42,6 +43,7 @@ public class StageDefinitionBuilder
private int maxWorkerCount = 1; private int maxWorkerCount = 1;
private ShuffleSpec shuffleSpec = null; private ShuffleSpec shuffleSpec = null;
private boolean shuffleCheckHasMultipleValues = false; private boolean shuffleCheckHasMultipleValues = false;
private long maxInputBytesPerWorker = Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER;
/** /**
* Package-private: callers should prefer {@link StageDefinition#builder(int)} rather than this constructor. * Package-private: callers should prefer {@link StageDefinition#builder(int)} rather than this constructor.
@ -105,6 +107,12 @@ public class StageDefinitionBuilder
return this; return this;
} }
public StageDefinitionBuilder maxInputBytesPerWorker(final long maxInputBytesPerWorker)
{
this.maxInputBytesPerWorker = maxInputBytesPerWorker;
return this;
}
int getStageNumber() int getStageNumber()
{ {
return stageNumber; return stageNumber;
@ -120,7 +128,8 @@ public class StageDefinitionBuilder
signature, signature,
shuffleSpec, shuffleSpec,
maxWorkerCount, maxWorkerCount,
shuffleCheckHasMultipleValues shuffleCheckHasMultipleValues,
maxInputBytesPerWorker
); );
} }
} }

View File

@ -57,7 +57,7 @@ public enum WorkerAssignmentStrategy
/** /**
* Use the lowest possible number of tasks, while keeping each task's workload under * Use the lowest possible number of tasks, while keeping each task's workload under
* {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@link Limits#MAX_INPUT_BYTES_PER_WORKER} bytes. * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@link StageDefinition#getMaxInputBytesPerWorker()} bytes.
* *
* Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible. * Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible.
*/ */
@ -75,7 +75,7 @@ public enum WorkerAssignmentStrategy
inputSpec, inputSpec,
stageDef.getMaxWorkerCount(), stageDef.getMaxWorkerCount(),
Limits.MAX_INPUT_FILES_PER_WORKER, Limits.MAX_INPUT_FILES_PER_WORKER,
Limits.MAX_INPUT_BYTES_PER_WORKER stageDef.getMaxInputBytesPerWorker()
); );
} else { } else {
// In auto mode, if we can't slice inputs dynamically, we instead carry forwards the number of workers from // In auto mode, if we can't slice inputs dynamically, we instead carry forwards the number of workers from

View File

@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.opencsv.RFC4180Parser; import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder; import com.opencsv.RFC4180ParserBuilder;
import org.apache.druid.msq.exec.ClusterStatisticsMergeMode; import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.sql.MSQMode; import org.apache.druid.msq.sql.MSQMode;
import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContext;
@ -63,6 +64,7 @@ public class MultiStageQueryContext
public static final String CTX_FAULT_TOLERANCE = "faultTolerance"; public static final String CTX_FAULT_TOLERANCE = "faultTolerance";
public static final boolean DEFAULT_FAULT_TOLERANCE = false; public static final boolean DEFAULT_FAULT_TOLERANCE = false;
public static final String CTX_MAX_INPUT_BYTES_PER_WORKER = "maxInputBytesPerWorker";
public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = "clusterStatisticsMergeMode"; public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = "clusterStatisticsMergeMode";
public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.PARALLEL.toString(); public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.PARALLEL.toString();
@ -115,6 +117,14 @@ public class MultiStageQueryContext
); );
} }
public static long getMaxInputBytesPerWorker(final QueryContext queryContext)
{
return queryContext.getLong(
CTX_MAX_INPUT_BYTES_PER_WORKER,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);
}
public static boolean isComposedIntermediateSuperSorterStorageEnabled(final QueryContext queryContext) public static boolean isComposedIntermediateSuperSorterStorageEnabled(final QueryContext queryContext)
{ {
return queryContext.getBoolean( return queryContext.getBoolean(

View File

@ -25,6 +25,7 @@ import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.SortColumn; import org.apache.druid.frame.key.SortColumn;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl; import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
@ -55,7 +56,8 @@ public class StageDefinitionTest
RowSignature.empty(), RowSignature.empty(),
null, null,
0, 0,
false false,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
); );
Assert.assertThrows(ISE.class, () -> stageDefinition.generatePartitionsForShuffle(null)); Assert.assertThrows(ISE.class, () -> stageDefinition.generatePartitionsForShuffle(null));
@ -72,7 +74,8 @@ public class StageDefinitionTest
RowSignature.empty(), RowSignature.empty(),
new MaxCountShuffleSpec(new ClusterBy(ImmutableList.of(new SortColumn("test", false)), 1), 2, false), new MaxCountShuffleSpec(new ClusterBy(ImmutableList.of(new SortColumn("test", false)), 1), 2, false),
1, 1,
false false,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
); );
Assert.assertThrows(ISE.class, () -> stageDefinition.generatePartitionsForShuffle(null)); Assert.assertThrows(ISE.class, () -> stageDefinition.generatePartitionsForShuffle(null));
@ -90,7 +93,8 @@ public class StageDefinitionTest
RowSignature.empty(), RowSignature.empty(),
new MaxCountShuffleSpec(new ClusterBy(ImmutableList.of(new SortColumn("test", false)), 0), 1, false), new MaxCountShuffleSpec(new ClusterBy(ImmutableList.of(new SortColumn("test", false)), 0), 1, false),
1, 1,
false false,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
); );
Assert.assertThrows( Assert.assertThrows(

View File

@ -100,6 +100,16 @@ public class MultiStageQueryContextTest
); );
} }
@Test
public void testGetMaxInputBytesPerWorker()
{
Map<String, Object> propertyMap = ImmutableMap.of(MultiStageQueryContext.CTX_MAX_INPUT_BYTES_PER_WORKER, 1024);
Assert.assertEquals(
1024,
MultiStageQueryContext.getMaxInputBytesPerWorker(QueryContext.of(propertyMap)));
}
@Test @Test
public void getAssignmentStrategy_parameterSetReturnsCorrectValue() public void getAssignmentStrategy_parameterSetReturnsCorrectValue()
{ {