Adjust defaults for hashed partitioning (#8565)

* Adjust defaults for hashed partitioning

If neither the partition size nor the number of shards are specified,
default to partitions of 5,000,000 rows (similar to the behavior of
dynamic partitions). Previously, both could be null and cause incorrect
behavior.

Specifying both a partition size and a number of shards now results in
an error instead of ignoring the partition size in favor of using the
number of shards. This is a behavior change that makes it more apparent
to the user that only one of the two properties will be honored
(previously, a message was just logged when the specified partition size
was ignored).

* Fix test

* Handle -1 as null

* Add -1 as null tests for single dim partitioning

* Simplify logic to handle -1 as null

* Address review comments
This commit is contained in:
Chi Cao Minh 2019-09-21 20:57:40 -07:00 committed by Fangjin Yang
parent 99b6eedab5
commit aeac0d4fd3
10 changed files with 210 additions and 86 deletions

View File

@ -25,23 +25,38 @@ package org.apache.druid.indexer.partitions;
class Checks
{
/**
* @return Non-null value, or first one if both are null
* @return Non-null value, or first one if both are null. -1 is interpreted as null for historical reasons.
*/
@SuppressWarnings("VariableNotUsedInsideIf") // false positive: checked for 'null' not used inside 'if
static Property<Integer> checkAtMostOneNotNull(String name1, Integer value1, String name2, Integer value2)
static Property<Integer> checkAtMostOneNotNull(Property<Integer> property1, Property<Integer> property2)
{
final Property<Integer> property;
if (value1 == null && value2 == null) {
property = new Property<>(name1, value1);
} else if (value1 == null) {
property = new Property<>(name2, value2);
} else if (value2 == null) {
property = new Property<>(name1, value1);
boolean isNull1 = property1.getValue() == null;
boolean isNull2 = property2.getValue() == null;
if (isNull1 && isNull2) {
property = property1;
} else if (isNull1) {
property = property2;
} else if (isNull2) {
property = property1;
} else {
throw new IllegalArgumentException("At most one of " + name1 + " or " + name2 + " must be present");
throw new IllegalArgumentException(
"At most one of " + property1.getName() + " or " + property2.getName() + " must be present"
);
}
return property;
}
/**
* @return Non-null value, or first one if both are null. -1 is interpreted as null for historical reasons.
*/
static Property<Integer> checkAtMostOneNotNull(String name1, Integer value1, String name2, Integer value2)
{
Property<Integer> property1 = new Property<>(name1, value1);
Property<Integer> property2 = new Property<>(name2, value2);
return checkAtMostOneNotNull(property1, property2);
}
}

View File

@ -21,8 +21,8 @@ package org.apache.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.util.Collections;
@ -32,7 +32,8 @@ import java.util.Objects;
public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
{
static final String NAME = "hashed";
private static final Logger LOG = new Logger(HashedPartitionsSpec.class);
@VisibleForTesting
static final String NUM_SHARDS = "numShards";
@Nullable
private final Integer maxRowsPerSegment;
@ -48,7 +49,7 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
@JsonCreator
public HashedPartitionsSpec(
@JsonProperty(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment,
@JsonProperty("numShards") @Nullable Integer numShards,
@JsonProperty(NUM_SHARDS) @Nullable Integer numShards,
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
// Deprecated properties preserved for backward compatibility:
@ -58,50 +59,48 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
Integer maxRowsPerSegment
)
{
Integer adjustedTargetRowsPerSegment = PartitionsSpec.resolveHistoricalNullIfNeeded(targetRowsPerSegment);
Integer adjustedNumShards = PartitionsSpec.resolveHistoricalNullIfNeeded(numShards);
Integer adjustedTargetPartitionSize = PartitionsSpec.resolveHistoricalNullIfNeeded(targetPartitionSize);
Integer adjustedMaxRowsPerSegment = PartitionsSpec.resolveHistoricalNullIfNeeded(maxRowsPerSegment);
// targetRowsPerSegment, targetPartitionSize, and maxRowsPerSegment are aliases
Property<Integer> target = Checks.checkAtMostOneNotNull(
DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT,
targetRowsPerSegment,
adjustedTargetRowsPerSegment,
DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE,
targetPartitionSize
adjustedTargetPartitionSize
);
target = Checks.checkAtMostOneNotNull(
target,
new Property<>(PartitionsSpec.MAX_ROWS_PER_SEGMENT, adjustedMaxRowsPerSegment)
);
Preconditions.checkArgument(
PartitionsSpec.isEffectivelyNull(target.getValue()) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment),
"Can't set both " + target.getName() + " and maxRowsPerSegment"
);
final Integer realMaxRowsPerSegment = target.getValue() == null ? maxRowsPerSegment : target.getValue();
Preconditions.checkArgument(
PartitionsSpec.isEffectivelyNull(realMaxRowsPerSegment) || PartitionsSpec.isEffectivelyNull(numShards),
"Can't use maxRowsPerSegment or " + target.getName() + " and numShards together"
);
// Needs to determine partitions if the _given_ numShards is null
this.maxRowsPerSegment = getValidMaxRowsPerSegment(realMaxRowsPerSegment, numShards);
this.numShards = PartitionsSpec.isEffectivelyNull(numShards) ? null : numShards;
// targetRowsPerSegment/targetPartitionSize/maxRowsPerSegment and numShards are incompatible
Checks.checkAtMostOneNotNull(target, new Property<>(NUM_SHARDS, adjustedNumShards));
this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions;
this.numShards = adjustedNumShards;
// Supply default for targetRowsPerSegment if needed
if (target.getValue() == null) {
//noinspection VariableNotUsedInsideIf (false positive for this.numShards)
this.maxRowsPerSegment = (this.numShards == null ? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT : null);
} else {
this.maxRowsPerSegment = target.getValue();
}
Preconditions.checkArgument(
this.maxRowsPerSegment == null || this.maxRowsPerSegment > 0,
"maxRowsPerSegment[%s] should be positive",
this.maxRowsPerSegment
"%s[%s] should be positive",
target.getName(),
target.getValue()
);
Preconditions.checkArgument(
this.numShards == null || this.numShards > 0,
"numShards[%s] should be positive",
this.numShards
);
final boolean needsPartitionDetermination = needsDeterminePartitions(numShards);
if (!needsPartitionDetermination) {
Preconditions.checkState(
this.maxRowsPerSegment == null,
"maxRowsPerSegment[%s] must be null if we don't need to determine partitions",
this.maxRowsPerSegment
);
Preconditions.checkState(
this.numShards != null,
"numShards must not be null if we don't need to determine partitions"
);
}
}
public HashedPartitionsSpec(
@ -113,25 +112,6 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
this(null, numShards, partitionDimensions, null, maxRowsPerSegment);
}
private static boolean needsDeterminePartitions(@Nullable Integer numShards)
{
return PartitionsSpec.isEffectivelyNull(numShards);
}
@Nullable
private static Integer getValidMaxRowsPerSegment(@Nullable Integer maxRowsPerSegment, @Nullable Integer numShards)
{
if (needsDeterminePartitions(numShards)) {
return PartitionsSpec.isEffectivelyNull(maxRowsPerSegment) ? null : maxRowsPerSegment;
} else {
if (!PartitionsSpec.isEffectivelyNull(maxRowsPerSegment)) {
LOG.warn("maxRowsPerSegment[%s] is ignored since numShards[%s] is specified", maxRowsPerSegment, numShards);
}
return null;
}
}
@Nullable
@Override
@JsonProperty

View File

@ -30,7 +30,8 @@ import javax.annotation.Nullable;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.NAME, value = SingleDimensionPartitionsSpec.class),
@JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class), // for backward compatibility
@JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class),
// for backward compatibility
@JsonSubTypes.Type(name = HashedPartitionsSpec.NAME, value = HashedPartitionsSpec.class),
@JsonSubTypes.Type(name = DynamicPartitionsSpec.NAME, value = DynamicPartitionsSpec.class)
})
@ -38,6 +39,7 @@ public interface PartitionsSpec
{
int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
String MAX_ROWS_PER_SEGMENT = "maxRowsPerSegment";
int HISTORICAL_NULL = -1;
/**
* Returns the max number of rows per segment.
@ -58,7 +60,7 @@ public interface PartitionsSpec
*/
static boolean isEffectivelyNull(@Nullable Integer val)
{
return val == null || val == -1;
return val == null || val == HISTORICAL_NULL;
}
/**
@ -66,6 +68,12 @@ public interface PartitionsSpec
*/
static boolean isEffectivelyNull(@Nullable Long val)
{
return val == null || val == -1;
return val == null || val == HISTORICAL_NULL;
}
@Nullable
static Integer resolveHistoricalNullIfNeeded(@Nullable Integer val)
{
return isEffectivelyNull(val) ? null : val;
}
}

View File

@ -64,18 +64,23 @@ public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSp
Integer maxPartitionSize // prefer maxRowsPerSegment
)
{
Integer adjustedTargetRowsPerSegment = PartitionsSpec.resolveHistoricalNullIfNeeded(targetRowsPerSegment);
Integer adjustedMaxRowsPerSegment = PartitionsSpec.resolveHistoricalNullIfNeeded(maxRowsPerSegment);
Integer adjustedTargetPartitionSize = PartitionsSpec.resolveHistoricalNullIfNeeded(targetPartitionSize);
Integer adjustedMaxPartitionSize = PartitionsSpec.resolveHistoricalNullIfNeeded(maxPartitionSize);
Property<Integer> target = Checks.checkAtMostOneNotNull(
DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT,
targetRowsPerSegment,
adjustedTargetRowsPerSegment,
DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE,
targetPartitionSize
adjustedTargetPartitionSize
);
Property<Integer> max = Checks.checkAtMostOneNotNull(
PartitionsSpec.MAX_ROWS_PER_SEGMENT,
maxRowsPerSegment,
adjustedMaxRowsPerSegment,
MAX_PARTITION_SIZE,
maxPartitionSize
adjustedMaxPartitionSize
);
Preconditions.checkArgument(

View File

@ -32,6 +32,7 @@ public class SingleDimensionPartitionsSpecTest
{
private static final Integer TARGET_ROWS_PER_SEGMENT = 1;
private static final Integer MAX_ROWS_PER_SEGMENT = null;
private static final Integer HISTORICAL_NULL = PartitionsSpec.HISTORICAL_NULL;
private static final String PARTITION_DIMENSION = "a";
private static final boolean ASSUME_GROUPED = false;
private static final SingleDimensionPartitionsSpec SPEC = new SingleDimensionPartitionsSpec(
@ -90,7 +91,6 @@ public class SingleDimensionPartitionsSpecTest
{
new Tester()
.testIllegalArgumentException("Exactly one of targetRowsPerSegment or maxRowsPerSegment must be present");
}
@Test
@ -101,6 +101,14 @@ public class SingleDimensionPartitionsSpecTest
.testIllegalArgumentException("targetRowsPerSegment must be greater than 0");
}
@Test
public void targetRowsPerSegmentHistoricalNull()
{
new Tester()
.targetRowsPerSegment(HISTORICAL_NULL)
.testIllegalArgumentException("Exactly one of targetRowsPerSegment or maxRowsPerSegment must be present");
}
@Test
public void targetPartitionSizeMustBePositive()
{
@ -133,6 +141,14 @@ public class SingleDimensionPartitionsSpecTest
.testIllegalArgumentException("maxRowsPerSegment must be greater than 0");
}
@Test
public void maxRowsPerSegmentHistoricalNull()
{
new Tester()
.maxRowsPerSegment(HISTORICAL_NULL)
.testIllegalArgumentException("Exactly one of targetRowsPerSegment or maxRowsPerSegment must be present");
}
@Test
public void maxPartitionSizeMustBePositive()
{
@ -141,6 +157,14 @@ public class SingleDimensionPartitionsSpecTest
.testIllegalArgumentException("maxPartitionSize must be greater than 0");
}
@Test
public void maxPartitionHistoricalNull()
{
new Tester()
.maxPartitionSize(HISTORICAL_NULL)
.testIllegalArgumentException("Exactly one of targetRowsPerSegment or maxRowsPerSegment must be present");
}
@Test
public void resolvesMaxFromTargetRowsPerSegment()
{

View File

@ -337,7 +337,7 @@ The configuration options are:
|Field|Description|Required|
|--------|-----------|---------|
|type|Type of partitionSpec to be used.|"hashed"|
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`|
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB. Defaults to 5000000 if `numShards` is not set.|either this or `numShards`|
|targetPartitionSize|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`|
|maxRowsPerSegment|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`|
|numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or `maxRowsPerSegment`|

View File

@ -229,7 +229,7 @@ For perfect rollup, you should use `hashed`.
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `hashed`|none|yes|
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|null|either this or `numShards`|
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|5000000 (if `numShards` is not set)|either this or `numShards`|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `targetRowsPerSegment` is set.|null|no|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `targetRowsPerSegment` is set.|null|no|

View File

@ -264,7 +264,7 @@ public class HadoopIngestionSpecTest
Assert.assertFalse("overwriteFiles", schema.getTuningConfig().isOverwriteFiles());
Assert.assertFalse(
Assert.assertTrue(
"isDeterminingPartitions",
schema.getTuningConfig().getPartitionsSpec().needsDeterminePartitions(true)
);

View File

@ -21,12 +21,19 @@ package org.apache.druid.indexer.partitions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class HashedPartitionsSpecTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
@ -35,7 +42,7 @@ public class HashedPartitionsSpecTest
public ExpectedException exception = ExpectedException.none();
@Test
public void testHashedPartitionsSpec()
public void havingTargetRowsPerSegmentOnly()
{
final HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead(
"{"
@ -52,7 +59,7 @@ public class HashedPartitionsSpecTest
100,
hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue()
);
Assert.assertNull(hadoopHashedPartitionsSpec.getNumShards());
Assert.assertEquals(
"getPartitionDimensions",
ImmutableList.of(),
@ -61,7 +68,7 @@ public class HashedPartitionsSpecTest
}
@Test
public void testHashedPartitionsSpecShardCount()
public void havingNumShardsOnly()
{
final HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead(
"{"
@ -92,21 +99,104 @@ public class HashedPartitionsSpecTest
}
@Test
public void testHashedPartitionsSpecBothTargetForbidden()
public void havingIncompatiblePropertiesIsForbidden()
{
exception.expect(RuntimeException.class);
exception.expectMessage("At most one of targetRowsPerSegment or targetPartitionSize must be present");
final String targetRowsPerSegment = DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT;
final String targetPartitionSize = DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE;
final String maxRowsPerSegment = PartitionsSpec.MAX_ROWS_PER_SEGMENT;
final String numShards = HashedPartitionsSpec.NUM_SHARDS;
String json = "{"
+ "\"type\":\"hashed\""
+ ",\"targetRowsPerSegment\":100"
+ ",\"targetPartitionSize\":100"
+ "}";
jsonReadWriteRead(json);
Multimap<String, String> incompatiblePairs = ImmutableMultimap.<String, String>builder()
.put(targetRowsPerSegment, targetPartitionSize)
.put(targetRowsPerSegment, maxRowsPerSegment)
.put(targetRowsPerSegment, numShards)
.put(targetPartitionSize, maxRowsPerSegment)
.put(targetPartitionSize, numShards)
.put(maxRowsPerSegment, numShards)
.build();
for (Map.Entry<String, String> test : incompatiblePairs.entries()) {
String first = test.getKey();
String second = test.getValue();
String reasonPrefix = first + "/" + second;
String json = "{"
+ "\"type\":\"hashed\""
+ ",\"" + first + "\":100"
+ ",\"" + second + "\":100"
+ "}";
try {
jsonReadWriteRead(json);
Assert.fail(reasonPrefix + " did not throw exception");
}
catch (RuntimeException e) {
String expectedMessage = "At most one of " + first + " or " + second + " must be present";
Assert.assertThat(
reasonPrefix + " has wrong failure message",
e.getMessage(),
CoreMatchers.containsString(expectedMessage)
);
}
}
}
@Test
public void testHashedPartitionsSpecBackwardCompatibleTargetPartitionSize()
public void defaults()
{
final HashedPartitionsSpec spec = jsonReadWriteRead("{\"type\":\"hashed\"}");
Assert.assertNotNull(spec.getMaxRowsPerSegment());
Assert.assertEquals(PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT, spec.getMaxRowsPerSegment().intValue());
Assert.assertNull(spec.getNumShards());
Assert.assertEquals(Collections.emptyList(), spec.getPartitionDimensions());
}
@Test
public void handlesHistoricalNull()
{
String json = "{"
+ "\"type\":\"hashed\""
+ ",\"targetRowsPerSegment\":" + PartitionsSpec.HISTORICAL_NULL
+ ",\"numShards\":" + PartitionsSpec.HISTORICAL_NULL
+ "}";
final HashedPartitionsSpec spec = jsonReadWriteRead(json);
Assert.assertNotNull(spec.getMaxRowsPerSegment());
Assert.assertEquals(PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT, spec.getMaxRowsPerSegment().intValue());
Assert.assertNull(spec.getNumShards());
Assert.assertEquals(Collections.emptyList(), spec.getPartitionDimensions());
}
@Test
public void failsIfNotPositive()
{
List<String> properties = ImmutableList.of(
DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT,
DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE,
PartitionsSpec.MAX_ROWS_PER_SEGMENT,
HashedPartitionsSpec.NUM_SHARDS
);
for (String property : properties) {
String json = "{"
+ "\"type\":\"hashed\""
+ ",\"" + property + "\":0"
+ "}";
try {
jsonReadWriteRead(json);
Assert.fail(property + " did not throw exception");
}
catch (RuntimeException e) {
String expectedMessage = property + "[0] should be positive";
Assert.assertThat(
property + " has wrong failure message",
e.getMessage(),
CoreMatchers.containsString(expectedMessage)
);
}
}
}
@Test
public void backwardCompatibleWithTargetPartitionSize()
{
String json = "{"
+ "\"type\":\"hashed\""
@ -123,7 +213,7 @@ public class HashedPartitionsSpecTest
}
@Test
public void testHashedPartitionsSpecBackwardCompatibleMaxRowsPerSegment()
public void backwardCompatibleWithMaxRowsPerSegment()
{
String json = "{"
+ "\"type\":\"hashed\""

View File

@ -28,6 +28,7 @@ import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexer.HadoopIOConfig;
import org.apache.druid.indexer.HadoopIngestionSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
@ -159,7 +160,8 @@ public class TaskSerdeTest
);
Assert.assertNull(tuningConfig.getNumShards());
Assert.assertNull(tuningConfig.getMaxRowsPerSegment());
Assert.assertNotNull(tuningConfig.getMaxRowsPerSegment());
Assert.assertEquals(PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT, tuningConfig.getMaxRowsPerSegment().intValue());
}
@Test