mirror of https://github.com/apache/druid.git
Add a limit to the number of columns in the CLUSTERED BY clause (#13352)
* Add clustered by limit * change semantics, add docs * add fault class to the module * add test * unambiguate test
This commit is contained in:
parent
309cae7b65
commit
9e938b5a6f
|
@ -232,6 +232,7 @@ The following table lists query limits:
|
|||
| Number of input files/segments per worker. | 10,000 | `TooManyInputFiles` |
|
||||
| Number of output partitions for any one stage. Number of segments generated during ingestion. |25,000 | `TooManyPartitions` |
|
||||
| Number of output columns for any one stage. | 2,000 | `TooManyColumns` |
|
||||
| Number of cluster by columns that can appear in a stage | 1,500 | `TooManyClusteredByColumns` |
|
||||
| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | `TooManyWorkers` |
|
||||
| Maximum memory occupied by broadcasted tables. | 30% of each [processor memory bundle](concepts.md#memory-usage). | `BroadcastTablesTooLarge` |
|
||||
|
||||
|
@ -262,6 +263,7 @@ The following table describes error codes you may encounter in the `multiStageQu
|
|||
| `TooManyBuckets` | Exceeded the number of partition buckets for a stage. Partition buckets are only used for `segmentGranularity` during INSERT queries. The most common reason for this error is that your `segmentGranularity` is too narrow relative to the data. See the [Limits](#limits) table for the specific limit. | `maxBuckets`: The limit on buckets. |
|
||||
| `TooManyInputFiles` | Exceeded the number of input files/segments per worker. See the [Limits](#limits) table for the specific limit. | `numInputFiles`: The total number of input files/segments for the stage.<br /><br />`maxInputFiles`: The maximum number of input files/segments per worker per stage.<br /><br />`minNumWorker`: The minimum number of workers required for a successful run. |
|
||||
| `TooManyPartitions` | Exceeded the number of partitions for a stage. The most common reason for this is that the final stage of an INSERT or REPLACE query generated too many segments. See the [Limits](#limits) table for the specific limit. | `maxPartitions`: The limit on partitions which was exceeded |
|
||||
| `TooManyClusteredByColumns` | Exceeded the number of cluster by columns for a stage. See the [Limits](#limits) table for the specific limit. | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded.`stage`: The stage number exceeding the limit<br /><br /> |
|
||||
| `TooManyColumns` | Exceeded the number of columns for a stage. See the [Limits](#limits) table for the specific limit. | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded. |
|
||||
| `TooManyWarnings` | Exceeded the allowed number of warnings of a particular type. | `rootErrorCode`: The error code corresponding to the exception that exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of warnings that are allowed for the corresponding `rootErrorCode`. |
|
||||
| `TooManyWorkers` | Exceeded the supported number of workers running simultaneously. See the [Limits](#limits) table for the specific limit. | `workers`: The number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage if multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard or soft limit on workers that was exceeded. |
|
||||
|
|
|
@ -29,6 +29,16 @@ public class Limits
|
|||
*/
|
||||
public static final int MAX_FRAME_COLUMNS = 2000;
|
||||
|
||||
/**
|
||||
* Maximum number of columns that can appear in the clustered by clause
|
||||
*
|
||||
* There is some arbitrariness in the limit, but it is chosen such that the datasketches sketches do not blow up in
|
||||
* memory while computing the partitions for the clustered by keys.
|
||||
* This limit along sequential merge of the sketches will help prevent OOMs in both the workers and the controller
|
||||
* tasks
|
||||
*/
|
||||
public static final int MAX_CLUSTERED_BY_COLUMNS = (int) (MAX_FRAME_COLUMNS * 0.75);
|
||||
|
||||
/**
|
||||
* Maximum number of workers that can be used in a stage, regardless of available memory.
|
||||
*/
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.math.IntMath;
|
|||
import com.google.common.primitives.Ints;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.msq.indexing.error.MSQException;
|
||||
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
|
||||
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
|
||||
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
|
||||
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
|
||||
|
@ -48,6 +49,17 @@ public class QueryValidator
|
|||
throw new MSQException(new TooManyColumnsFault(numColumns, Limits.MAX_FRAME_COLUMNS));
|
||||
}
|
||||
|
||||
final int numClusteredByColumns = stageDef.getClusterBy().getColumns().size();
|
||||
if (numClusteredByColumns > Limits.MAX_CLUSTERED_BY_COLUMNS) {
|
||||
throw new MSQException(
|
||||
new TooManyClusteredByColumnsFault(
|
||||
numClusteredByColumns,
|
||||
Limits.MAX_CLUSTERED_BY_COLUMNS,
|
||||
stageDef.getStageNumber()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
final int numWorkers = stageDef.getMaxWorkerCount();
|
||||
if (numWorkers > Limits.MAX_WORKERS) {
|
||||
throw new MSQException(new TooManyWorkersFault(numWorkers, Limits.MAX_WORKERS));
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.druid.msq.indexing.error.QueryNotSupportedFault;
|
|||
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
|
||||
import org.apache.druid.msq.indexing.error.TaskStartTimeoutFault;
|
||||
import org.apache.druid.msq.indexing.error.TooManyBucketsFault;
|
||||
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
|
||||
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
|
||||
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
|
||||
import org.apache.druid.msq.indexing.error.TooManyPartitionsFault;
|
||||
|
@ -118,6 +119,7 @@ public class MSQIndexingModule implements DruidModule
|
|||
RowTooLargeFault.class,
|
||||
TaskStartTimeoutFault.class,
|
||||
TooManyBucketsFault.class,
|
||||
TooManyClusteredByColumnsFault.class,
|
||||
TooManyColumnsFault.class,
|
||||
TooManyInputFilesFault.class,
|
||||
TooManyPartitionsFault.class,
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.msq.indexing.error;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
@JsonTypeName(TooManyClusteredByColumnsFault.CODE)
|
||||
public class TooManyClusteredByColumnsFault extends BaseMSQFault
|
||||
{
|
||||
static final String CODE = "TooManyClusteredByColumns";
|
||||
|
||||
private final int numColumns;
|
||||
private final int maxColumns;
|
||||
private final int stage;
|
||||
|
||||
@JsonCreator
|
||||
public TooManyClusteredByColumnsFault(
|
||||
@JsonProperty("numColumns") final int numColumns,
|
||||
@JsonProperty("maxColumns") final int maxColumns,
|
||||
@JsonProperty("stage") final int stage
|
||||
)
|
||||
{
|
||||
super(CODE, "Too many cluster by columns present in stage [%s] (requested = %d, max = %d)", stage, numColumns, maxColumns);
|
||||
this.numColumns = numColumns;
|
||||
this.maxColumns = maxColumns;
|
||||
this.stage = stage;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getNumColumns()
|
||||
{
|
||||
return numColumns;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getMaxColumns()
|
||||
{
|
||||
return maxColumns;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getStage()
|
||||
{
|
||||
return stage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
if (!super.equals(o)) {
|
||||
return false;
|
||||
}
|
||||
TooManyClusteredByColumnsFault that = (TooManyClusteredByColumnsFault) o;
|
||||
return numColumns == that.numColumns && maxColumns == that.maxColumns && stage == that.stage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(super.hashCode(), numColumns, maxColumns, stage);
|
||||
}
|
||||
}
|
|
@ -27,10 +27,12 @@ import com.google.common.hash.Hashing;
|
|||
import org.apache.druid.hll.HyperLogLogCollector;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
|
||||
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
|
||||
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
|
||||
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
|
||||
import org.apache.druid.msq.test.MSQTestBase;
|
||||
import org.apache.druid.msq.util.MultiStageQueryContext;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
|
@ -53,6 +55,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class MSQInsertTest extends MSQTestBase
|
||||
{
|
||||
|
@ -444,6 +448,49 @@ public class MSQInsertTest extends MSQTestBase
|
|||
.verifyPlanningErrors();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertWithHugeClusteringKeys()
|
||||
{
|
||||
RowSignature dummyRowSignature = RowSignature.builder().add("__time", ColumnType.LONG).build();
|
||||
|
||||
final int numColumns = 1700;
|
||||
|
||||
String columnNames = IntStream.range(1, numColumns)
|
||||
.mapToObj(i -> "col" + i).collect(Collectors.joining(", "));
|
||||
|
||||
String clusteredByClause = IntStream.range(1, numColumns + 1)
|
||||
.mapToObj(String::valueOf)
|
||||
.collect(Collectors.joining(", "));
|
||||
|
||||
String externSignature = IntStream.range(1, numColumns)
|
||||
.mapToObj(i -> StringUtils.format(
|
||||
"{\"name\": \"col%d\", \"type\": \"string\"}",
|
||||
i
|
||||
))
|
||||
.collect(Collectors.joining(", "));
|
||||
|
||||
testIngestQuery()
|
||||
.setSql(StringUtils.format(
|
||||
" insert into foo1 SELECT\n"
|
||||
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
|
||||
+ " %s\n"
|
||||
+ "FROM TABLE(\n"
|
||||
+ " EXTERN(\n"
|
||||
+ " '{ \"files\": [\"ignored\"],\"type\":\"local\"}',\n"
|
||||
+ " '{\"type\": \"json\"}',\n"
|
||||
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, %s]'\n"
|
||||
+ " )\n"
|
||||
+ ") PARTITIONED by day CLUSTERED BY %s",
|
||||
columnNames,
|
||||
externSignature,
|
||||
clusteredByClause
|
||||
))
|
||||
.setExpectedDataSource("foo1")
|
||||
.setExpectedRowSignature(dummyRowSignature)
|
||||
.setExpectedMSQFault(new TooManyClusteredByColumnsFault(numColumns + 2, 1500, 0))
|
||||
.verifyResults();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertRestrictedColumns()
|
||||
{
|
||||
|
|
|
@ -70,6 +70,7 @@ public class MSQFaultSerdeTest
|
|||
assertFaultSerde(new TaskStartTimeoutFault(10));
|
||||
assertFaultSerde(new TooManyBucketsFault(10));
|
||||
assertFaultSerde(new TooManyColumnsFault(10, 8));
|
||||
assertFaultSerde(new TooManyClusteredByColumnsFault(10, 8, 1));
|
||||
assertFaultSerde(new TooManyInputFilesFault(15, 10, 5));
|
||||
assertFaultSerde(new TooManyPartitionsFault(10));
|
||||
assertFaultSerde(new TooManyWarningsFault(10, "the error"));
|
||||
|
|
Loading…
Reference in New Issue