mirror of https://github.com/apache/druid.git
Rename partition spec fields (#8507)
* Rename partition spec fields Rename partition spec fields to be consistent across the various types (hashed, single_dim, dynamic). Specifically, use targetNumRowsPerSegment and maxRowsPerSegment in favor of targetPartitionSize and maxSegmentSize. Consistent and clearer names are easier for users to understand and use. Also fix various IntelliJ inspection warnings and doc spelling mistakes. * Fix test * Improve docs * Add targetRowsPerSegment to HashedPartitionsSpec
This commit is contained in:
parent
187b507b3d
commit
99b6eedab5
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexer.partitions;
|
||||
|
||||
/**
|
||||
* Various helper methods useful for checking the validity of arguments to spec constructors.
|
||||
*/
|
||||
class Checks
|
||||
{
|
||||
/**
|
||||
* @return Non-null value, or first one if both are null
|
||||
*/
|
||||
@SuppressWarnings("VariableNotUsedInsideIf") // false positive: checked for 'null' not used inside 'if
|
||||
static Property<Integer> checkAtMostOneNotNull(String name1, Integer value1, String name2, Integer value2)
|
||||
{
|
||||
final Property<Integer> property;
|
||||
|
||||
if (value1 == null && value2 == null) {
|
||||
property = new Property<>(name1, value1);
|
||||
} else if (value1 == null) {
|
||||
property = new Property<>(name2, value2);
|
||||
} else if (value2 == null) {
|
||||
property = new Property<>(name1, value1);
|
||||
} else {
|
||||
throw new IllegalArgumentException("At most one of " + name1 + " or " + name2 + " must be present");
|
||||
}
|
||||
|
||||
return property;
|
||||
}
|
||||
}
|
|
@ -26,5 +26,11 @@ import java.util.List;
|
|||
*/
|
||||
public interface DimensionBasedPartitionsSpec extends PartitionsSpec
|
||||
{
|
||||
String TARGET_ROWS_PER_SEGMENT = "targetRowsPerSegment";
|
||||
|
||||
// Deprecated properties preserved for backward compatibility:
|
||||
@Deprecated
|
||||
String TARGET_PARTITION_SIZE = "targetPartitionSize";
|
||||
|
||||
List<String> getPartitionDimensions();
|
||||
}
|
||||
|
|
|
@ -31,13 +31,14 @@ import java.util.Objects;
|
|||
public class DynamicPartitionsSpec implements PartitionsSpec
|
||||
{
|
||||
public static final long DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
|
||||
static final String NAME = "dynamic";
|
||||
|
||||
private final int maxRowsPerSegment;
|
||||
private final long maxTotalRows;
|
||||
|
||||
@JsonCreator
|
||||
public DynamicPartitionsSpec(
|
||||
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows
|
||||
)
|
||||
{
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.Objects;
|
|||
|
||||
public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
|
||||
{
|
||||
static final String NAME = "hashed";
|
||||
private static final Logger LOG = new Logger(HashedPartitionsSpec.class);
|
||||
|
||||
@Nullable
|
||||
|
@ -41,34 +42,37 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
|
|||
|
||||
public static HashedPartitionsSpec defaultSpec()
|
||||
{
|
||||
return new HashedPartitionsSpec(null, null, null, null);
|
||||
}
|
||||
|
||||
public HashedPartitionsSpec(
|
||||
@Nullable Integer maxRowsPerSegment,
|
||||
@Nullable Integer numShards,
|
||||
@Nullable List<String> partitionDimensions
|
||||
)
|
||||
{
|
||||
this(null, maxRowsPerSegment, numShards, partitionDimensions);
|
||||
return new HashedPartitionsSpec(null, null, null, null, null);
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public HashedPartitionsSpec(
|
||||
@JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
|
||||
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment,
|
||||
@JsonProperty("numShards") @Nullable Integer numShards,
|
||||
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions
|
||||
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
|
||||
|
||||
// Deprecated properties preserved for backward compatibility:
|
||||
@Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable
|
||||
Integer targetPartitionSize,
|
||||
@Deprecated @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable
|
||||
Integer maxRowsPerSegment
|
||||
)
|
||||
{
|
||||
Preconditions.checkArgument(
|
||||
PartitionsSpec.isEffectivelyNull(targetPartitionSize) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment),
|
||||
"Can't set both targetPartitionSize and maxRowsPerSegment"
|
||||
Property<Integer> target = Checks.checkAtMostOneNotNull(
|
||||
DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT,
|
||||
targetRowsPerSegment,
|
||||
DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE,
|
||||
targetPartitionSize
|
||||
);
|
||||
final Integer realMaxRowsPerSegment = targetPartitionSize == null ? maxRowsPerSegment : targetPartitionSize;
|
||||
|
||||
Preconditions.checkArgument(
|
||||
PartitionsSpec.isEffectivelyNull(target.getValue()) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment),
|
||||
"Can't set both " + target.getName() + " and maxRowsPerSegment"
|
||||
);
|
||||
final Integer realMaxRowsPerSegment = target.getValue() == null ? maxRowsPerSegment : target.getValue();
|
||||
Preconditions.checkArgument(
|
||||
PartitionsSpec.isEffectivelyNull(realMaxRowsPerSegment) || PartitionsSpec.isEffectivelyNull(numShards),
|
||||
"Can't use maxRowsPerSegment or targetPartitionSize and numShards together"
|
||||
"Can't use maxRowsPerSegment or " + target.getName() + " and numShards together"
|
||||
);
|
||||
// Needs to determine partitions if the _given_ numShards is null
|
||||
this.maxRowsPerSegment = getValidMaxRowsPerSegment(realMaxRowsPerSegment, numShards);
|
||||
|
@ -100,6 +104,16 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
|
|||
}
|
||||
}
|
||||
|
||||
public HashedPartitionsSpec(
|
||||
@Nullable Integer maxRowsPerSegment,
|
||||
@Nullable Integer numShards,
|
||||
@Nullable List<String> partitionDimensions
|
||||
)
|
||||
{
|
||||
this(null, numShards, partitionDimensions, null, maxRowsPerSegment);
|
||||
}
|
||||
|
||||
|
||||
private static boolean needsDeterminePartitions(@Nullable Integer numShards)
|
||||
{
|
||||
return PartitionsSpec.isEffectivelyNull(numShards);
|
||||
|
|
|
@ -29,14 +29,15 @@ import javax.annotation.Nullable;
|
|||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "single_dim", value = SingleDimensionPartitionsSpec.class),
|
||||
@JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class), // for backward compatibility
|
||||
@JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class),
|
||||
@JsonSubTypes.Type(name = "dynamic", value = DynamicPartitionsSpec.class)
|
||||
@JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.NAME, value = SingleDimensionPartitionsSpec.class),
|
||||
@JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class), // for backward compatibility
|
||||
@JsonSubTypes.Type(name = HashedPartitionsSpec.NAME, value = HashedPartitionsSpec.class),
|
||||
@JsonSubTypes.Type(name = DynamicPartitionsSpec.NAME, value = DynamicPartitionsSpec.class)
|
||||
})
|
||||
public interface PartitionsSpec
|
||||
{
|
||||
int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
|
||||
String MAX_ROWS_PER_SEGMENT = "maxRowsPerSegment";
|
||||
|
||||
/**
|
||||
* Returns the max number of rows per segment.
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexer.partitions;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Convenience class for holding a pair of string key and templated value.
|
||||
*/
|
||||
class Property<T>
|
||||
{
|
||||
private final String name;
|
||||
private final T value;
|
||||
|
||||
Property(String name, T value)
|
||||
{
|
||||
this.name = name;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
public T getValue()
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Property<?> property = (Property<?>) o;
|
||||
return Objects.equals(name, property.name) &&
|
||||
Objects.equals(value, property.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(name, value);
|
||||
}
|
||||
}
|
|
@ -21,78 +21,127 @@ package org.apache.druid.indexer.partitions;
|
|||
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Partition a segment by a single dimension.
|
||||
*/
|
||||
public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSpec
|
||||
{
|
||||
private final int maxRowsPerSegment;
|
||||
private final int maxPartitionSize;
|
||||
@Nullable
|
||||
static final String NAME = "single_dim";
|
||||
static final String OLD_NAME = "dimension"; // for backward compatibility
|
||||
|
||||
private static final String MAX_PARTITION_SIZE = "maxPartitionSize";
|
||||
|
||||
private final Integer targetRowsPerSegment;
|
||||
private final Integer maxRowsPerSegment;
|
||||
private final String partitionDimension;
|
||||
private final boolean assumeGrouped;
|
||||
|
||||
// Values for these fields are derived from the one above:
|
||||
private final int resolvedMaxRowPerSegment;
|
||||
|
||||
@JsonCreator
|
||||
public SingleDimensionPartitionsSpec(
|
||||
int maxRowsPerSegment,
|
||||
@Nullable Integer maxPartitionSize,
|
||||
@JsonProperty(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment,
|
||||
@JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("partitionDimension") @Nullable String partitionDimension,
|
||||
@JsonProperty("assumeGrouped") boolean assumeGrouped, // false by default
|
||||
|
||||
// Deprecated properties preserved for backward compatibility:
|
||||
@Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable
|
||||
Integer targetPartitionSize, // prefer targetRowsPerSegment
|
||||
@Deprecated @JsonProperty(MAX_PARTITION_SIZE) @Nullable
|
||||
Integer maxPartitionSize // prefer maxRowsPerSegment
|
||||
)
|
||||
{
|
||||
Property<Integer> target = Checks.checkAtMostOneNotNull(
|
||||
DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT,
|
||||
targetRowsPerSegment,
|
||||
DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE,
|
||||
targetPartitionSize
|
||||
);
|
||||
|
||||
Property<Integer> max = Checks.checkAtMostOneNotNull(
|
||||
PartitionsSpec.MAX_ROWS_PER_SEGMENT,
|
||||
maxRowsPerSegment,
|
||||
MAX_PARTITION_SIZE,
|
||||
maxPartitionSize
|
||||
);
|
||||
|
||||
Preconditions.checkArgument(
|
||||
(target.getValue() == null) != (max.getValue() == null),
|
||||
"Exactly one of " + target.getName() + " or " + max.getName() + " must be present"
|
||||
);
|
||||
|
||||
this.partitionDimension = partitionDimension;
|
||||
this.assumeGrouped = assumeGrouped;
|
||||
this.targetRowsPerSegment = target.getValue();
|
||||
this.maxRowsPerSegment = max.getValue();
|
||||
|
||||
this.resolvedMaxRowPerSegment = resolveMaxRowsPerSegment(target, max);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public SingleDimensionPartitionsSpec(
|
||||
@Nullable Integer targetRowsPerSegment,
|
||||
@Nullable Integer maxRowsPerSegment,
|
||||
@Nullable String partitionDimension,
|
||||
boolean assumeGrouped
|
||||
)
|
||||
{
|
||||
this(null, maxRowsPerSegment, maxPartitionSize, partitionDimension, assumeGrouped);
|
||||
this(targetRowsPerSegment, maxRowsPerSegment, partitionDimension, assumeGrouped, null, null);
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public SingleDimensionPartitionsSpec(
|
||||
@JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize,
|
||||
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxPartitionSize") @Nullable Integer maxPartitionSize,
|
||||
@JsonProperty("partitionDimension") @Nullable String partitionDimension,
|
||||
@JsonProperty("assumeGrouped") boolean assumeGrouped // false by default
|
||||
)
|
||||
private static int resolveMaxRowsPerSegment(Property<Integer> target, Property<Integer> max)
|
||||
{
|
||||
Preconditions.checkArgument(
|
||||
PartitionsSpec.isEffectivelyNull(targetPartitionSize) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment),
|
||||
"Can't set both targetPartitionSize and maxRowsPerSegment"
|
||||
);
|
||||
Preconditions.checkArgument(
|
||||
!PartitionsSpec.isEffectivelyNull(targetPartitionSize) || !PartitionsSpec.isEffectivelyNull(maxRowsPerSegment),
|
||||
"Either targetPartitionSize or maxRowsPerSegment must be specified"
|
||||
);
|
||||
final int realMaxRowsPerSegment = targetPartitionSize == null ? maxRowsPerSegment : targetPartitionSize;
|
||||
Preconditions.checkArgument(realMaxRowsPerSegment > 0, "maxRowsPerSegment must be specified");
|
||||
this.maxRowsPerSegment = realMaxRowsPerSegment;
|
||||
this.maxPartitionSize = PartitionsSpec.isEffectivelyNull(maxPartitionSize)
|
||||
? Math.addExact(realMaxRowsPerSegment, (int) (realMaxRowsPerSegment * 0.5))
|
||||
: maxPartitionSize;
|
||||
this.partitionDimension = partitionDimension;
|
||||
this.assumeGrouped = assumeGrouped;
|
||||
final int resolvedValue;
|
||||
|
||||
if (target.getValue() != null) {
|
||||
Preconditions.checkArgument(target.getValue() > 0, target.getName() + " must be greater than 0");
|
||||
try {
|
||||
resolvedValue = Math.addExact(target.getValue(), (target.getValue() / 2));
|
||||
}
|
||||
catch (ArithmeticException e) {
|
||||
throw new IllegalArgumentException(target.getName() + " is too large");
|
||||
}
|
||||
} else {
|
||||
Preconditions.checkArgument(max.getValue() > 0, max.getName() + " must be greater than 0");
|
||||
resolvedValue = max.getValue();
|
||||
}
|
||||
return resolvedValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
public Integer getTargetRowsPerSegment()
|
||||
{
|
||||
return targetRowsPerSegment;
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
@Override
|
||||
@NotNull
|
||||
public Integer getMaxRowsPerSegment()
|
||||
{
|
||||
return resolvedMaxRowPerSegment; // NOTE: This returns the *resolved* value
|
||||
}
|
||||
|
||||
@JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT)
|
||||
private Integer getMaxRowsPerSegmentForJson()
|
||||
{
|
||||
return maxRowsPerSegment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needsDeterminePartitions(boolean useForHadoopTask)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getMaxPartitionSize()
|
||||
{
|
||||
return maxPartitionSize;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
public String getPartitionDimension()
|
||||
|
@ -106,12 +155,19 @@ public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSp
|
|||
return assumeGrouped;
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
@Override
|
||||
public List<String> getPartitionDimensions()
|
||||
{
|
||||
return partitionDimension == null ? Collections.emptyList() : Collections.singletonList(partitionDimension);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needsDeterminePartitions(boolean useForHadoopTask)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
@ -122,26 +178,34 @@ public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSp
|
|||
return false;
|
||||
}
|
||||
SingleDimensionPartitionsSpec that = (SingleDimensionPartitionsSpec) o;
|
||||
return maxRowsPerSegment == that.maxRowsPerSegment &&
|
||||
maxPartitionSize == that.maxPartitionSize &&
|
||||
assumeGrouped == that.assumeGrouped &&
|
||||
return assumeGrouped == that.assumeGrouped &&
|
||||
resolvedMaxRowPerSegment == that.resolvedMaxRowPerSegment &&
|
||||
Objects.equals(targetRowsPerSegment, that.targetRowsPerSegment) &&
|
||||
Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
|
||||
Objects.equals(partitionDimension, that.partitionDimension);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(maxRowsPerSegment, maxPartitionSize, partitionDimension, assumeGrouped);
|
||||
return Objects.hash(
|
||||
targetRowsPerSegment,
|
||||
maxRowsPerSegment,
|
||||
partitionDimension,
|
||||
assumeGrouped,
|
||||
resolvedMaxRowPerSegment
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "SingleDimensionPartitionsSpec{" +
|
||||
"maxRowsPerSegment=" + maxRowsPerSegment +
|
||||
", maxPartitionSize=" + maxPartitionSize +
|
||||
"targetRowsPerSegment=" + targetRowsPerSegment +
|
||||
", maxRowsPerSegment=" + maxRowsPerSegment +
|
||||
", partitionDimension='" + partitionDimension + '\'' +
|
||||
", assumeGrouped=" + assumeGrouped +
|
||||
", resolvedMaxRowPerSegment=" + resolvedMaxRowPerSegment +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexer.partitions;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
public class ChecksTest
|
||||
{
|
||||
private static final String NAME1 = "name1";
|
||||
private static final Integer VALUE1 = 1;
|
||||
private static final String NAME2 = "name2";
|
||||
private static final Integer VALUE2 = 2;
|
||||
private static final Integer NULL = null;
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void checkAtMostOneNotNullFirstNull()
|
||||
{
|
||||
Property<Integer> result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, VALUE2);
|
||||
Assert.assertEquals(NAME2, result.getName());
|
||||
Assert.assertEquals(VALUE2, result.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkAtMostOneNotNullSecondNull()
|
||||
{
|
||||
Property<Integer> result = Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, NULL);
|
||||
Assert.assertEquals(NAME1, result.getName());
|
||||
Assert.assertEquals(VALUE1, result.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkAtMostOneNotNullBothNull()
|
||||
{
|
||||
Property<Integer> result = Checks.checkAtMostOneNotNull(NAME1, NULL, NAME2, NULL);
|
||||
Assert.assertEquals(NAME1, result.getName());
|
||||
Assert.assertEquals(NULL, result.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkAtMostOneNotNullNeitherNull()
|
||||
{
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage("At most one of " + NAME1 + " or " + NAME2 + " must be present");
|
||||
|
||||
//noinspection ConstantConditions (expected to fail)
|
||||
Checks.checkAtMostOneNotNull(NAME1, VALUE1, NAME2, VALUE2);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,279 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexer.partitions;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
public class SingleDimensionPartitionsSpecTest
|
||||
{
|
||||
private static final Integer TARGET_ROWS_PER_SEGMENT = 1;
|
||||
private static final Integer MAX_ROWS_PER_SEGMENT = null;
|
||||
private static final String PARTITION_DIMENSION = "a";
|
||||
private static final boolean ASSUME_GROUPED = false;
|
||||
private static final SingleDimensionPartitionsSpec SPEC = new SingleDimensionPartitionsSpec(
|
||||
TARGET_ROWS_PER_SEGMENT,
|
||||
MAX_ROWS_PER_SEGMENT,
|
||||
PARTITION_DIMENSION,
|
||||
ASSUME_GROUPED
|
||||
);
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void serde()
|
||||
{
|
||||
String json = serialize(SPEC);
|
||||
SingleDimensionPartitionsSpec spec = deserialize(json);
|
||||
Assert.assertEquals(SPEC, spec);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deserializeWithBackwardCompatibility()
|
||||
{
|
||||
String serialized = "{"
|
||||
+ "\"type\":\"" + SingleDimensionPartitionsSpec.NAME + "\""
|
||||
+ ",\"targetPartitionSize\":" + TARGET_ROWS_PER_SEGMENT // test backward-compatible for this
|
||||
+ ",\"maxPartitionSize\":" + MAX_ROWS_PER_SEGMENT // test backward-compatible for this
|
||||
+ ",\"partitionDimension\":\"" + PARTITION_DIMENSION + "\""
|
||||
+ ",\"assumeGrouped\":" + ASSUME_GROUPED
|
||||
+ "}";
|
||||
SingleDimensionPartitionsSpec spec = deserialize(serialized);
|
||||
Assert.assertEquals(SPEC, spec);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void havingBothTargetForbidden()
|
||||
{
|
||||
new Tester()
|
||||
.targetRowsPerSegment(1)
|
||||
.targetPartitionSize(1)
|
||||
.testIllegalArgumentException("At most one of targetRowsPerSegment or targetPartitionSize must be present");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void havingBothMaxForbidden()
|
||||
{
|
||||
new Tester()
|
||||
.maxRowsPerSegment(1)
|
||||
.maxPartitionSize(1)
|
||||
.testIllegalArgumentException("At most one of maxRowsPerSegment or maxPartitionSize must be present");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void havingNeitherTargetNorMaxForbidden()
|
||||
{
|
||||
new Tester()
|
||||
.testIllegalArgumentException("Exactly one of targetRowsPerSegment or maxRowsPerSegment must be present");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void targetRowsPerSegmentMustBePositive()
|
||||
{
|
||||
new Tester()
|
||||
.targetRowsPerSegment(0)
|
||||
.testIllegalArgumentException("targetRowsPerSegment must be greater than 0");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void targetPartitionSizeMustBePositive()
|
||||
{
|
||||
new Tester()
|
||||
.targetPartitionSize(0)
|
||||
.testIllegalArgumentException("targetPartitionSize must be greater than 0");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void targetMaxRowsPerSegmentOverflows()
|
||||
{
|
||||
new Tester()
|
||||
.targetRowsPerSegment(Integer.MAX_VALUE)
|
||||
.testIllegalArgumentException("targetRowsPerSegment is too large");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void targetPartitionSizeOverflows()
|
||||
{
|
||||
new Tester()
|
||||
.targetPartitionSize(Integer.MAX_VALUE)
|
||||
.testIllegalArgumentException("targetPartitionSize is too large");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxRowsPerSegmentMustBePositive()
|
||||
{
|
||||
new Tester()
|
||||
.maxRowsPerSegment(0)
|
||||
.testIllegalArgumentException("maxRowsPerSegment must be greater than 0");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxPartitionSizeMustBePositive()
|
||||
{
|
||||
new Tester()
|
||||
.maxPartitionSize(0)
|
||||
.testIllegalArgumentException("maxPartitionSize must be greater than 0");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void resolvesMaxFromTargetRowsPerSegment()
|
||||
{
|
||||
SingleDimensionPartitionsSpec spec = new Tester()
|
||||
.targetRowsPerSegment(123)
|
||||
.build();
|
||||
Assert.assertEquals(184, spec.getMaxRowsPerSegment().intValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void resolvesMaxFromTargetPartitionSize()
|
||||
{
|
||||
SingleDimensionPartitionsSpec spec = new Tester()
|
||||
.targetPartitionSize(123)
|
||||
.build();
|
||||
Assert.assertEquals(Integer.valueOf(184), spec.getMaxRowsPerSegment());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void resolvesMaxFromMaxRowsPerSegment()
|
||||
{
|
||||
SingleDimensionPartitionsSpec spec = new Tester()
|
||||
.maxRowsPerSegment(123)
|
||||
.build();
|
||||
Assert.assertEquals(123, spec.getMaxRowsPerSegment().intValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void resolvesMaxFromMaxPartitionSize()
|
||||
{
|
||||
SingleDimensionPartitionsSpec spec = new Tester()
|
||||
.maxPartitionSize(123)
|
||||
.build();
|
||||
Assert.assertEquals(123, spec.getMaxRowsPerSegment().intValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getPartitionDimensionFromNull()
|
||||
{
|
||||
SingleDimensionPartitionsSpec spec = new Tester()
|
||||
.targetPartitionSize(1)
|
||||
.partitionDimension(null)
|
||||
.build();
|
||||
Assert.assertEquals(Collections.emptyList(), spec.getPartitionDimensions());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getPartitionDimensionFromNonNull()
|
||||
{
|
||||
String partitionDimension = "a";
|
||||
SingleDimensionPartitionsSpec spec = new Tester()
|
||||
.targetPartitionSize(1)
|
||||
.partitionDimension(partitionDimension)
|
||||
.build();
|
||||
Assert.assertEquals(Collections.singletonList(partitionDimension), spec.getPartitionDimensions());
|
||||
|
||||
}
|
||||
|
||||
private static String serialize(SingleDimensionPartitionsSpec spec)
|
||||
{
|
||||
try {
|
||||
return OBJECT_MAPPER.writeValueAsString(spec);
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static SingleDimensionPartitionsSpec deserialize(String serialized)
|
||||
{
|
||||
try {
|
||||
return OBJECT_MAPPER.readValue(serialized, SingleDimensionPartitionsSpec.class);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private class Tester
|
||||
{
|
||||
private Integer targetRowsPerSegment;
|
||||
private Integer maxRowsPerSegment;
|
||||
private String partitionDimension;
|
||||
private Integer targetPartitionSize;
|
||||
private Integer maxPartitionSize;
|
||||
|
||||
Tester targetRowsPerSegment(Integer targetRowsPerSegment)
|
||||
{
|
||||
this.targetRowsPerSegment = targetRowsPerSegment;
|
||||
return this;
|
||||
}
|
||||
|
||||
Tester maxRowsPerSegment(Integer maxRowsPerSegment)
|
||||
{
|
||||
this.maxRowsPerSegment = maxRowsPerSegment;
|
||||
return this;
|
||||
}
|
||||
|
||||
Tester partitionDimension(String partitionDimension)
|
||||
{
|
||||
this.partitionDimension = partitionDimension;
|
||||
return this;
|
||||
}
|
||||
|
||||
Tester targetPartitionSize(Integer targetPartitionSize)
|
||||
{
|
||||
this.targetPartitionSize = targetPartitionSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
Tester maxPartitionSize(Integer maxPartitionSize)
|
||||
{
|
||||
this.maxPartitionSize = maxPartitionSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
void testIllegalArgumentException(String exceptionExpectedMessage)
|
||||
{
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage(exceptionExpectedMessage);
|
||||
build();
|
||||
}
|
||||
|
||||
SingleDimensionPartitionsSpec build()
|
||||
{
|
||||
return new SingleDimensionPartitionsSpec(
|
||||
targetRowsPerSegment,
|
||||
maxRowsPerSegment,
|
||||
partitionDimension,
|
||||
SingleDimensionPartitionsSpecTest.ASSUME_GROUPED,
|
||||
targetPartitionSize,
|
||||
maxPartitionSize
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -285,7 +285,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|
|||
|jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)|
|
||||
|indexSpec|Object|Tune how data is indexed. See [`indexSpec`](index.md#indexspec) on the main ingestion page for more information.|no|
|
||||
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [`indexSpec`](index.md#indexspec) for possible values.|no (default = same as indexSpec)|
|
||||
|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
|
||||
|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and CPU usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
|
||||
|forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check [Partitioning specification](#partitionsspec). This option can be useful when you need to append more data to existing dataSource.|no (default = false)|
|
||||
|useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)|
|
||||
|logParseExceptions|Boolean|If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.|false|no|
|
||||
|
@ -324,7 +324,7 @@ sized data segments relative to single-dimension partitioning.
|
|||
```json
|
||||
"partitionsSpec": {
|
||||
"type": "hashed",
|
||||
"targetPartitionSize": 5000000
|
||||
"targetRowsPerSegment": 5000000
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -337,16 +337,19 @@ The configuration options are:
|
|||
|Field|Description|Required|
|
||||
|--------|-----------|---------|
|
||||
|type|Type of partitionSpec to be used.|"hashed"|
|
||||
|targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards|
|
||||
|numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or targetPartitionSize|
|
||||
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with numShards, will be ignored when targetPartitionSize is set|no|
|
||||
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`|
|
||||
|targetPartitionSize|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`|
|
||||
|maxRowsPerSegment|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`|
|
||||
|numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or `maxRowsPerSegment`|
|
||||
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `targetRowsPerSegment` is set.|no|
|
||||
|
||||
|
||||
### Single-dimension range partitioning
|
||||
|
||||
```json
|
||||
"partitionsSpec": {
|
||||
"type": "single_dim",
|
||||
"targetPartitionSize": 5000000
|
||||
"targetRowsPerSegment": 5000000
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -361,8 +364,10 @@ The configuration options are:
|
|||
|Field|Description|Required|
|
||||
|--------|-----------|---------|
|
||||
|type|Type of partitionSpec to be used.|"single_dim"|
|
||||
|targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes|
|
||||
|maxPartitionSize|Maximum number of rows to include in a partition. Defaults to 50% larger than the targetPartitionSize.|no|
|
||||
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes|
|
||||
|targetPartitionSize|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|no|
|
||||
|maxRowsPerSegment|Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetPartitionSize`.|no|
|
||||
|maxPartitionSize|Deprecated. Use `maxRowsPerSegment` instead. Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetPartitionSize`.|no|
|
||||
|partitionDimension|The dimension to partition on. Leave blank to select a dimension automatically.|no|
|
||||
|assumeGrouped|Assume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.|no|
|
||||
|
||||
|
|
|
@ -229,8 +229,9 @@ For perfect rollup, you should use `hashed`.
|
|||
|property|description|default|required?|
|
||||
|--------|-----------|-------|---------|
|
||||
|type|This should always be `hashed`|none|yes|
|
||||
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|
||||
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
|
||||
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|null|either this or `numShards`|
|
||||
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `targetRowsPerSegment` is set.|null|no|
|
||||
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `targetRowsPerSegment` is set.|null|no|
|
||||
|
||||
For best-effort rollup, you should use `dynamic`.
|
||||
|
||||
|
@ -629,7 +630,7 @@ For perfect rollup, you should use `hashed`.
|
|||
|--------|-----------|-------|---------|
|
||||
|type|This should always be `hashed`|none|yes|
|
||||
|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
|
||||
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no|
|
||||
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|
||||
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
|
||||
|
||||
For best-effort rollup, you should use `dynamic`.
|
||||
|
|
|
@ -103,7 +103,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
|
||||
private String failureCause;
|
||||
|
||||
public DeterminePartitionsJob(
|
||||
DeterminePartitionsJob(
|
||||
HadoopDruidIndexerConfig config
|
||||
)
|
||||
{
|
||||
|
@ -168,7 +168,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
try {
|
||||
if (!groupByJob.waitForCompletion(true)) {
|
||||
log.error("Job failed: %s", groupByJob.getJobID());
|
||||
failureCause = Utils.getFailureMessage(groupByJob, config.JSON_MAPPER);
|
||||
failureCause = Utils.getFailureMessage(groupByJob, HadoopDruidIndexerConfig.JSON_MAPPER);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -238,7 +238,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
try {
|
||||
if (!dimSelectionJob.waitForCompletion(true)) {
|
||||
log.error("Job failed: %s", dimSelectionJob.getJobID().toString());
|
||||
failureCause = Utils.getFailureMessage(dimSelectionJob, config.JSON_MAPPER);
|
||||
failureCause = Utils.getFailureMessage(dimSelectionJob, HadoopDruidIndexerConfig.JSON_MAPPER);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -262,7 +262,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
|
||||
}
|
||||
if (Utils.exists(dimSelectionJob, fileSystem, partitionInfoPath)) {
|
||||
List<ShardSpec> specs = config.JSON_MAPPER.readValue(
|
||||
List<ShardSpec> specs = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
|
||||
Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
|
||||
{
|
||||
}
|
||||
|
@ -298,14 +298,13 @@ public class DeterminePartitionsJob implements Jobby
|
|||
try {
|
||||
Counters jobCounters = groupByJob.getCounters();
|
||||
|
||||
Map<String, Object> metrics = TaskMetricsUtils.makeIngestionRowMetrics(
|
||||
return TaskMetricsUtils.makeIngestionRowMetrics(
|
||||
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).getValue(),
|
||||
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_WITH_ERRORS_COUNTER).getValue(),
|
||||
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_WITH_ERRORS_COUNTER)
|
||||
.getValue(),
|
||||
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_UNPARSEABLE_COUNTER).getValue(),
|
||||
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).getValue()
|
||||
);
|
||||
|
||||
return metrics;
|
||||
}
|
||||
catch (IllegalStateException ise) {
|
||||
log.debug("Couldn't get counters due to job state");
|
||||
|
@ -433,13 +432,13 @@ public class DeterminePartitionsJob implements Jobby
|
|||
* Since we have two slightly different DimSelectionMappers, this class encapsulates the shared logic for
|
||||
* emitting dimension value counts.
|
||||
*/
|
||||
public static class DeterminePartitionsDimSelectionMapperHelper
|
||||
static class DeterminePartitionsDimSelectionMapperHelper
|
||||
{
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
private final String partitionDimension;
|
||||
private final Map<Long, Integer> intervalIndexes;
|
||||
|
||||
public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
|
||||
DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
|
||||
{
|
||||
this.config = config;
|
||||
this.partitionDimension = partitionDimension;
|
||||
|
@ -454,7 +453,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
this.intervalIndexes = timeIndexBuilder.build();
|
||||
}
|
||||
|
||||
public void emitDimValueCounts(
|
||||
void emitDimValueCounts(
|
||||
TaskInputOutputContext<?, ?, BytesWritable, Text> context,
|
||||
DateTime timestamp,
|
||||
Map<String, Iterable<String>> dims
|
||||
|
@ -568,7 +567,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
Iterable<DimValueCount> combinedIterable
|
||||
) throws IOException, InterruptedException;
|
||||
|
||||
private Iterable<DimValueCount> combineRows(Iterable<Text> input)
|
||||
private static Iterable<DimValueCount> combineRows(Iterable<Text> input)
|
||||
{
|
||||
return new CombiningIterable<>(
|
||||
Iterables.transform(
|
||||
|
@ -771,7 +770,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
final SingleDimensionPartitionsSpec partitionsSpec =
|
||||
(SingleDimensionPartitionsSpec) config.getPartitionsSpec();
|
||||
for (final DimPartition partition : dimPartitions.partitions) {
|
||||
if (partition.rows > partitionsSpec.getMaxPartitionSize()) {
|
||||
if (partition.rows > partitionsSpec.getMaxRowsPerSegment()) {
|
||||
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
|
||||
oversized = true;
|
||||
}
|
||||
|
@ -857,7 +856,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
@Override
|
||||
public void checkOutputSpecs(JobContext job) throws IOException
|
||||
{
|
||||
Path outDir = getOutputPath(job);
|
||||
Path outDir = FileOutputFormat.getOutputPath(job);
|
||||
if (outDir == null) {
|
||||
throw new InvalidJobConfException("Output directory not set.");
|
||||
}
|
||||
|
@ -874,7 +873,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
this.dim = dim;
|
||||
}
|
||||
|
||||
public int getCardinality()
|
||||
int getCardinality()
|
||||
{
|
||||
int sum = 0;
|
||||
for (final DimPartition dimPartition : partitions) {
|
||||
|
@ -883,7 +882,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
return sum;
|
||||
}
|
||||
|
||||
public long getDistanceSquaredFromTarget(long target)
|
||||
long getDistanceSquaredFromTarget(long target)
|
||||
{
|
||||
long distance = 0;
|
||||
for (final DimPartition dimPartition : partitions) {
|
||||
|
@ -907,7 +906,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
private static class DimPartition
|
||||
{
|
||||
public ShardSpec shardSpec = null;
|
||||
public int cardinality = 0;
|
||||
int cardinality = 0;
|
||||
public long rows = 0;
|
||||
}
|
||||
|
||||
|
@ -924,12 +923,12 @@ public class DeterminePartitionsJob implements Jobby
|
|||
this.numRows = numRows;
|
||||
}
|
||||
|
||||
public Text toText()
|
||||
Text toText()
|
||||
{
|
||||
return new Text(TAB_JOINER.join(dim, String.valueOf(numRows), value));
|
||||
}
|
||||
|
||||
public static DimValueCount fromText(Text text)
|
||||
static DimValueCount fromText(Text text)
|
||||
{
|
||||
final Iterator<String> splits = TAB_SPLITTER.limit(3).split(text.toString()).iterator();
|
||||
final String dim = splits.next();
|
||||
|
|
|
@ -51,16 +51,15 @@ import java.util.Map;
|
|||
@RunWith(Parameterized.class)
|
||||
public class DeterminePartitionsJobTest
|
||||
{
|
||||
|
||||
private HadoopDruidIndexerConfig config;
|
||||
private int expectedNumOfSegments;
|
||||
private int[] expectedNumOfShardsForEachSegment;
|
||||
private String[][][] expectedStartEndForEachShard;
|
||||
private File dataFile;
|
||||
private File tmpDir;
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
private final int expectedNumOfSegments;
|
||||
private final int[] expectedNumOfShardsForEachSegment;
|
||||
private final String[][][] expectedStartEndForEachShard;
|
||||
private final File dataFile;
|
||||
private final File tmpDir;
|
||||
|
||||
@Parameterized.Parameters(name = "assumeGrouped={0}, "
|
||||
+ "targetPartitionSize={1}, "
|
||||
+ "maxRowsPerSegment={1}, "
|
||||
+ "interval={2}"
|
||||
+ "expectedNumOfSegments={3}, "
|
||||
+ "expectedNumOfShardsForEachSegment={4}, "
|
||||
|
@ -82,7 +81,7 @@ public class DeterminePartitionsJobTest
|
|||
{"c.example.com", "e.example.com"},
|
||||
{"e.example.com", "g.example.com"},
|
||||
{"g.example.com", "i.example.com"},
|
||||
{"i.example.com", null }
|
||||
{"i.example.com", null}
|
||||
}
|
||||
},
|
||||
ImmutableList.of(
|
||||
|
@ -222,7 +221,7 @@ public class DeterminePartitionsJobTest
|
|||
|
||||
public DeterminePartitionsJobTest(
|
||||
boolean assumeGrouped,
|
||||
Integer targetPartitionSize,
|
||||
Integer maxRowsPerSegment,
|
||||
String interval,
|
||||
int expectedNumOfSegments,
|
||||
int[] expectedNumOfShardsForEachSegment,
|
||||
|
@ -249,7 +248,11 @@ public class DeterminePartitionsJobTest
|
|||
new StringInputRowParser(
|
||||
new CSVParseSpec(
|
||||
new TimestampSpec("timestamp", "yyyyMMddHH", null),
|
||||
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")), null, null),
|
||||
new DimensionsSpec(
|
||||
DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")),
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
ImmutableList.of("timestamp", "host", "country", "visited_num"),
|
||||
false,
|
||||
|
@ -281,7 +284,7 @@ public class DeterminePartitionsJobTest
|
|||
new HadoopTuningConfig(
|
||||
tmpDir.getCanonicalPath(),
|
||||
null,
|
||||
new SingleDimensionPartitionsSpec(targetPartitionSize, null, null, assumeGrouped),
|
||||
new SingleDimensionPartitionsSpec(null, maxRowsPerSegment, null, assumeGrouped),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -319,9 +322,9 @@ public class DeterminePartitionsJobTest
|
|||
Assert.assertEquals(expectedNumOfSegments, config.getSchema().getTuningConfig().getShardSpecs().size());
|
||||
|
||||
for (Map.Entry<Long, List<HadoopyShardSpec>> entry : config.getSchema()
|
||||
.getTuningConfig()
|
||||
.getShardSpecs()
|
||||
.entrySet()) {
|
||||
.getTuningConfig()
|
||||
.getShardSpecs()
|
||||
.entrySet()) {
|
||||
int partitionNum = 0;
|
||||
List<HadoopyShardSpec> specs = entry.getValue();
|
||||
Assert.assertEquals(expectedNumOfShardsForEachSegment[segmentNum], specs.size());
|
||||
|
|
|
@ -150,8 +150,8 @@ public class HadoopIngestionSpecTest
|
|||
|
||||
Assert.assertEquals(
|
||||
"getTargetPartitionSize",
|
||||
partitionsSpec.getMaxRowsPerSegment().intValue(),
|
||||
100
|
||||
100,
|
||||
partitionsSpec.getMaxRowsPerSegment().intValue()
|
||||
);
|
||||
|
||||
Assert.assertTrue(
|
||||
|
@ -167,14 +167,13 @@ public class HadoopIngestionSpecTest
|
|||
|
||||
try {
|
||||
schema = jsonReadWriteRead(
|
||||
|
||||
"{\n"
|
||||
+ " \"tuningConfig\": {\n"
|
||||
+ " \"type\": \"hadoop\",\n"
|
||||
+ " \"partitionsSpec\": {\n"
|
||||
+ " \"type\": \"dimension\",\n"
|
||||
+ " \"targetPartitionSize\": 100,\n"
|
||||
+ " \"maxPartitionSize\" : 200,\n"
|
||||
+ " \"maxPartitionSize\" : null,\n"
|
||||
+ " \"partitionDimension\" : \"foo\"\n"
|
||||
+ " }\n"
|
||||
+ " }\n"
|
||||
|
@ -186,32 +185,29 @@ public class HadoopIngestionSpecTest
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
final SingleDimensionPartitionsSpec partitionsSpec =
|
||||
(SingleDimensionPartitionsSpec) schema.getTuningConfig().getPartitionsSpec();
|
||||
PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec();
|
||||
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec);
|
||||
|
||||
Assert.assertEquals(
|
||||
"isDeterminingPartitions",
|
||||
partitionsSpec.needsDeterminePartitions(true),
|
||||
true
|
||||
);
|
||||
SingleDimensionPartitionsSpec singleDimensionPartitionsSpec = (SingleDimensionPartitionsSpec) partitionsSpec;
|
||||
|
||||
Assert.assertTrue("isDeterminingPartitions", singleDimensionPartitionsSpec.needsDeterminePartitions(true));
|
||||
|
||||
Assert.assertEquals(
|
||||
"getTargetPartitionSize",
|
||||
partitionsSpec.getMaxRowsPerSegment().intValue(),
|
||||
100
|
||||
100,
|
||||
singleDimensionPartitionsSpec.getTargetRowsPerSegment().intValue()
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
"getMaxPartitionSize",
|
||||
partitionsSpec.getMaxPartitionSize(),
|
||||
200
|
||||
150,
|
||||
singleDimensionPartitionsSpec.getMaxRowsPerSegment().intValue()
|
||||
);
|
||||
|
||||
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec);
|
||||
Assert.assertEquals(
|
||||
"getPartitionDimension",
|
||||
partitionsSpec.getPartitionDimension(),
|
||||
"foo"
|
||||
"foo",
|
||||
singleDimensionPartitionsSpec.getPartitionDimension()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -262,15 +258,11 @@ public class HadoopIngestionSpecTest
|
|||
|
||||
Assert.assertEquals(
|
||||
"cleanupOnFailure",
|
||||
schema.getTuningConfig().isCleanupOnFailure(),
|
||||
true
|
||||
true,
|
||||
schema.getTuningConfig().isCleanupOnFailure()
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
"overwriteFiles",
|
||||
schema.getTuningConfig().isOverwriteFiles(),
|
||||
false
|
||||
);
|
||||
Assert.assertFalse("overwriteFiles", schema.getTuningConfig().isOverwriteFiles());
|
||||
|
||||
Assert.assertFalse(
|
||||
"isDeterminingPartitions",
|
||||
|
@ -324,14 +316,10 @@ public class HadoopIngestionSpecTest
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
Assert.assertEquals(
|
||||
"cleanupOnFailure",
|
||||
schema.getTuningConfig().isCleanupOnFailure(),
|
||||
false
|
||||
);
|
||||
Assert.assertFalse("cleanupOnFailure", schema.getTuningConfig().isCleanupOnFailure());
|
||||
}
|
||||
|
||||
private <T> T jsonReadWriteRead(String s, Class<T> klass)
|
||||
private static <T> T jsonReadWriteRead(String s, Class<T> klass)
|
||||
{
|
||||
try {
|
||||
return JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, klass)), klass);
|
||||
|
|
|
@ -23,90 +23,129 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HashedPartitionsSpecTest
|
||||
{
|
||||
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testHashedPartitionsSpec()
|
||||
{
|
||||
{
|
||||
final PartitionsSpec partitionsSpec = jsonReadWriteRead(
|
||||
"{"
|
||||
+ " \"targetPartitionSize\":100,"
|
||||
+ " \"type\":\"hashed\""
|
||||
+ "}",
|
||||
PartitionsSpec.class
|
||||
);
|
||||
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
|
||||
final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec;
|
||||
final HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead(
|
||||
"{"
|
||||
+ " \"targetRowsPerSegment\":100,"
|
||||
+ " \"type\":\"hashed\""
|
||||
+ "}"
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
"isDeterminingPartitions",
|
||||
hadoopHashedPartitionsSpec.needsDeterminePartitions(true),
|
||||
true
|
||||
);
|
||||
Assert.assertTrue("isDeterminingPartitions", hadoopHashedPartitionsSpec.needsDeterminePartitions(true));
|
||||
|
||||
Assert.assertEquals(
|
||||
"getTargetPartitionSize",
|
||||
hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue(),
|
||||
100
|
||||
);
|
||||
Assert.assertNotNull(hadoopHashedPartitionsSpec.getMaxRowsPerSegment());
|
||||
Assert.assertEquals(
|
||||
"getMaxRowsPerSegment",
|
||||
100,
|
||||
hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue()
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
"getPartitionDimensions",
|
||||
hadoopHashedPartitionsSpec.getPartitionDimensions(),
|
||||
ImmutableList.of()
|
||||
);
|
||||
}
|
||||
Assert.assertEquals(
|
||||
"getPartitionDimensions",
|
||||
ImmutableList.of(),
|
||||
hadoopHashedPartitionsSpec.getPartitionDimensions()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHashedPartitionsSpecShardCount()
|
||||
{
|
||||
final PartitionsSpec partitionsSpec = jsonReadWriteRead(
|
||||
final HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead(
|
||||
"{"
|
||||
+ " \"type\":\"hashed\","
|
||||
+ " \"numShards\":2"
|
||||
+ "}",
|
||||
PartitionsSpec.class
|
||||
+ "}"
|
||||
);
|
||||
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
|
||||
final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec;
|
||||
|
||||
Assert.assertEquals(
|
||||
"isDeterminingPartitions",
|
||||
hadoopHashedPartitionsSpec.needsDeterminePartitions(true),
|
||||
false
|
||||
);
|
||||
Assert.assertFalse("isDeterminingPartitions", hadoopHashedPartitionsSpec.needsDeterminePartitions(true));
|
||||
|
||||
Assert.assertNull(
|
||||
"getTargetPartitionSize",
|
||||
"getMaxRowsPerSegment",
|
||||
hadoopHashedPartitionsSpec.getMaxRowsPerSegment()
|
||||
);
|
||||
|
||||
Assert.assertNotNull(hadoopHashedPartitionsSpec.getNumShards());
|
||||
Assert.assertEquals(
|
||||
"shardCount",
|
||||
hadoopHashedPartitionsSpec.getNumShards().intValue(),
|
||||
2
|
||||
2,
|
||||
hadoopHashedPartitionsSpec.getNumShards().intValue()
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
"getPartitionDimensions",
|
||||
hadoopHashedPartitionsSpec.getPartitionDimensions(),
|
||||
ImmutableList.of()
|
||||
ImmutableList.of(),
|
||||
hadoopHashedPartitionsSpec.getPartitionDimensions()
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
private <T> T jsonReadWriteRead(String s, Class<T> klass)
|
||||
@Test
|
||||
public void testHashedPartitionsSpecBothTargetForbidden()
|
||||
{
|
||||
exception.expect(RuntimeException.class);
|
||||
exception.expectMessage("At most one of targetRowsPerSegment or targetPartitionSize must be present");
|
||||
|
||||
String json = "{"
|
||||
+ "\"type\":\"hashed\""
|
||||
+ ",\"targetRowsPerSegment\":100"
|
||||
+ ",\"targetPartitionSize\":100"
|
||||
+ "}";
|
||||
jsonReadWriteRead(json);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHashedPartitionsSpecBackwardCompatibleTargetPartitionSize()
|
||||
{
|
||||
String json = "{"
|
||||
+ "\"type\":\"hashed\""
|
||||
+ ",\"targetPartitionSize\":100"
|
||||
+ "}";
|
||||
HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead(json);
|
||||
|
||||
Assert.assertNotNull(hadoopHashedPartitionsSpec.getMaxRowsPerSegment());
|
||||
Assert.assertEquals(
|
||||
"getMaxRowsPerSegment",
|
||||
100,
|
||||
hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHashedPartitionsSpecBackwardCompatibleMaxRowsPerSegment()
|
||||
{
|
||||
String json = "{"
|
||||
+ "\"type\":\"hashed\""
|
||||
+ ",\"maxRowsPerSegment\":100"
|
||||
+ "}";
|
||||
HashedPartitionsSpec hadoopHashedPartitionsSpec = jsonReadWriteRead(json);
|
||||
|
||||
Assert.assertNotNull(hadoopHashedPartitionsSpec.getMaxRowsPerSegment());
|
||||
Assert.assertEquals(
|
||||
"getMaxRowsPerSegment",
|
||||
100,
|
||||
hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue()
|
||||
);
|
||||
}
|
||||
|
||||
private static HashedPartitionsSpec jsonReadWriteRead(String s)
|
||||
{
|
||||
try {
|
||||
return JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, klass)), klass);
|
||||
byte[] jsonBytes = JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, PartitionsSpec.class));
|
||||
PartitionsSpec partitionsSpec = JSON_MAPPER.readValue(jsonBytes, PartitionsSpec.class);
|
||||
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
|
||||
return (HashedPartitionsSpec) partitionsSpec;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
|
|
|
@ -864,6 +864,7 @@ segmentTable
|
|||
shardSpec
|
||||
single_dim
|
||||
targetPartitionSize
|
||||
targetRowsPerSegment
|
||||
useCombiner
|
||||
useExplicitVersion
|
||||
useNewAggs
|
||||
|
|
Loading…
Reference in New Issue