Store hash partition function in dataSegment and allow segment pruning only when hash partition function is provided (#10288)

* Store hash partition function in dataSegment and allow segment pruning only when hash partition function is provided

* query context

* fix tests; add more test

* javadoc

* docs and more tests

* remove default and hadoop tests

* consistent name and fix javadoc

* spelling and field name

* default function for partitionsSpec

* other comments

* address comments

* fix tests and spelling

* test

* doc
This commit is contained in:
Jihoon Son 2020-09-24 16:32:56 -07:00 committed by GitHub
parent cb30b1fe23
commit 0cc9eb4903
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
59 changed files with 1315 additions and 395 deletions

View File

@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import javax.annotation.Nullable;
import java.util.Collections;
@ -38,16 +39,18 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
static final String NUM_SHARDS = "numShards";
private static final String FORCE_GUARANTEED_ROLLUP_COMPATIBLE = "";
private static final HashPartitionFunction DEFAULT_HASH_FUNCTION = HashPartitionFunction.MURMUR3_32_ABS;
@Nullable
private final Integer maxRowsPerSegment;
@Nullable
private final Integer numShards;
private final List<String> partitionDimensions;
private final HashPartitionFunction partitionFunction;
public static HashedPartitionsSpec defaultSpec()
{
return new HashedPartitionsSpec(null, null, null, null, null);
return new HashedPartitionsSpec(null, null, null, null, null, null);
}
@JsonCreator
@ -55,6 +58,7 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
@JsonProperty(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment,
@JsonProperty(NUM_SHARDS) @Nullable Integer numShards,
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
@JsonProperty("partitionFunction") @Nullable HashPartitionFunction partitionFunction,
// Deprecated properties preserved for backward compatibility:
@Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable
@ -84,6 +88,7 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
Checks.checkAtMostOneNotNull(target, new Property<>(NUM_SHARDS, adjustedNumShards));
this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions;
this.partitionFunction = partitionFunction == null ? DEFAULT_HASH_FUNCTION : partitionFunction;
this.numShards = adjustedNumShards;
// Supply default for targetRowsPerSegment if needed
@ -107,13 +112,23 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
);
}
public HashedPartitionsSpec(
@Nullable Integer maxRowsPerSegment,
@Nullable Integer numShards,
@Nullable List<String> partitionDimensions,
@Nullable HashPartitionFunction partitionFunction
)
{
this(null, numShards, partitionDimensions, partitionFunction, maxRowsPerSegment, null);
}
public HashedPartitionsSpec(
@Nullable Integer maxRowsPerSegment,
@Nullable Integer numShards,
@Nullable List<String> partitionDimensions
)
{
this(null, numShards, partitionDimensions, null, maxRowsPerSegment);
this(null, numShards, partitionDimensions, null, maxRowsPerSegment, null);
}
@Nullable
@ -157,6 +172,12 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
return partitionDimensions;
}
@JsonProperty
public HashPartitionFunction getPartitionFunction()
{
return partitionFunction;
}
@Override
public String getForceGuaranteedRollupIncompatiblityReason()
{
@ -175,13 +196,14 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
HashedPartitionsSpec that = (HashedPartitionsSpec) o;
return Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
Objects.equals(numShards, that.numShards) &&
Objects.equals(partitionDimensions, that.partitionDimensions);
Objects.equals(partitionDimensions, that.partitionDimensions) &&
partitionFunction == that.partitionFunction;
}
@Override
public int hashCode()
{
return Objects.hash(maxRowsPerSegment, numShards, partitionDimensions);
return Objects.hash(maxRowsPerSegment, numShards, partitionDimensions, partitionFunction);
}
@Override
@ -191,6 +213,7 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
"maxRowsPerSegment=" + maxRowsPerSegment +
", numShards=" + numShards +
", partitionDimensions=" + partitionDimensions +
", partitionFunction=" + partitionFunction +
'}';
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
import java.util.List;
@ -41,6 +42,7 @@ public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec<Has
private final int bucketId;
private final int numBuckets;
private final List<String> partitionDimensions;
private final HashPartitionFunction partitionFunction;
private final ObjectMapper jsonMapper;
@JsonCreator
@ -49,6 +51,7 @@ public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec<Has
@JsonProperty("bucketId") int bucketId,
@JsonProperty("numBuckets") int numBuckets,
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
@JsonProperty("partitionFunction") HashPartitionFunction partitionFunction,
@JacksonInject ObjectMapper jsonMapper
)
{
@ -58,6 +61,7 @@ public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec<Has
this.partitionDimensions = partitionDimensions == null
? HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS
: partitionDimensions;
this.partitionFunction = Preconditions.checkNotNull(partitionFunction, "partitionFunction");
this.jsonMapper = jsonMapper;
}
@ -87,6 +91,12 @@ public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec<Has
return partitionDimensions;
}
@JsonProperty
public HashPartitionFunction getPartitionFunction()
{
return partitionFunction;
}
@Override
public <T> PartitionChunk<T> createChunk(T obj)
{
@ -105,6 +115,7 @@ public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec<Has
bucketId,
numBuckets,
partitionDimensions,
partitionFunction,
jsonMapper
);
}
@ -122,13 +133,14 @@ public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec<Has
return partitionId == that.partitionId &&
bucketId == that.bucketId &&
numBuckets == that.numBuckets &&
Objects.equals(partitionDimensions, that.partitionDimensions);
Objects.equals(partitionDimensions, that.partitionDimensions) &&
partitionFunction == that.partitionFunction;
}
@Override
public int hashCode()
{
return Objects.hash(partitionId, bucketId, numBuckets, partitionDimensions);
return Objects.hash(partitionId, bucketId, numBuckets, partitionDimensions, partitionFunction);
}
@Override
@ -139,6 +151,7 @@ public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec<Has
", bucketId=" + bucketId +
", numBuckets=" + numBuckets +
", partitionDimensions=" + partitionDimensions +
", partitionFunction=" + partitionFunction +
'}';
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.RangeSet;
import org.apache.druid.data.input.InputRow;
import java.util.List;
import java.util.Map;
@ -76,12 +75,6 @@ public interface BuildingShardSpec<T extends ShardSpec> extends ShardSpec
// The below methods are used on the query side, and so must not be called for this shardSpec.
@Override
default boolean isInChunk(long timestamp, InputRow inputRow)
{
throw new UnsupportedOperationException();
}
@JsonIgnore
@Override
default List<String> getDomainDimensions()

View File

@ -35,17 +35,21 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
private final List<String> partitionDimensions;
private final int bucketId;
private final int numBuckets;
@Nullable
private final HashPartitionFunction partitionFunction;
@JsonCreator
public HashBasedNumberedPartialShardSpec(
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
@JsonProperty("bucketId") int bucketId,
@JsonProperty("numPartitions") int numBuckets
@JsonProperty("numPartitions") int numBuckets,
@JsonProperty("partitionFunction") @Nullable HashPartitionFunction partitionFunction // nullable for backward compatibility
)
{
this.partitionDimensions = partitionDimensions;
this.bucketId = bucketId;
this.numBuckets = numBuckets;
this.partitionFunction = partitionFunction;
}
@Nullable
@ -67,6 +71,13 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
return numBuckets;
}
@JsonProperty
@Nullable
public HashPartitionFunction getPartitionFunction()
{
return partitionFunction;
}
@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions)
{
@ -76,6 +87,7 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
bucketId,
numBuckets,
partitionDimensions,
partitionFunction,
objectMapper
);
}
@ -98,12 +110,13 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
HashBasedNumberedPartialShardSpec that = (HashBasedNumberedPartialShardSpec) o;
return bucketId == that.bucketId &&
numBuckets == that.numBuckets &&
Objects.equals(partitionDimensions, that.partitionDimensions);
Objects.equals(partitionDimensions, that.partitionDimensions) &&
Objects.equals(partitionFunction, that.partitionFunction);
}
@Override
public int hashCode()
{
return Objects.hash(partitionDimensions, bucketId, numBuckets);
return Objects.hash(partitionDimensions, bucketId, numBuckets, partitionFunction);
}
}

View File

@ -21,20 +21,15 @@ package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Rows;
import org.apache.druid.java.util.common.ISE;
import javax.annotation.Nullable;
import java.util.Collections;
@ -49,17 +44,29 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
{
public static final List<String> DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();
private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
private final int bucketId;
/**
* Number of hash buckets
*/
private final int numBuckets;
private final ObjectMapper jsonMapper;
@JsonIgnore
private final List<String> partitionDimensions;
/**
* A hash function to use for both hash partitioning at ingestion time and pruning segments at query time.
*
* During ingestion, the partition function is defaulted to {@link HashPartitionFunction#MURMUR3_32_ABS} if this
* variable is null. See {@link HashPartitioner} for details.
*
* During query, this function will be null unless it is explicitly specified in
* {@link org.apache.druid.indexer.partitions.HashedPartitionsSpec} at ingestion time. This is because the default
* hash function used to create segments at ingestion time can change over time, but we don't guarantee the changed
* hash function is backwards-compatible. The query will process all segments if this function is null.
*/
@Nullable
private final HashPartitionFunction partitionFunction;
@JsonCreator
public HashBasedNumberedShardSpec(
@JsonProperty("partitionNum") int partitionNum, // partitionId
@ -67,6 +74,7 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
@JsonProperty("bucketId") @Nullable Integer bucketId, // nullable for backward compatibility
@JsonProperty("numBuckets") @Nullable Integer numBuckets, // nullable for backward compatibility
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
@JsonProperty("partitionFunction") @Nullable HashPartitionFunction partitionFunction, // nullable for backward compatibility
@JacksonInject ObjectMapper jsonMapper
)
{
@ -76,8 +84,9 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
// If numBuckets is missing, assume that any hash bucket is not empty.
// Use the core partition set size as the number of buckets.
this.numBuckets = numBuckets == null ? partitions : numBuckets;
this.jsonMapper = jsonMapper;
this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions;
this.partitionFunction = partitionFunction;
this.jsonMapper = jsonMapper;
}
@JsonProperty
@ -98,140 +107,44 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
return partitionDimensions;
}
@JsonProperty
public @Nullable HashPartitionFunction getPartitionFunction()
{
return partitionFunction;
}
@Override
public List<String> getDomainDimensions()
{
return partitionDimensions;
}
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
return getBucketIndex(hash(timestamp, inputRow), numBuckets) == bucketId % numBuckets;
}
/**
* Check if the current segment possibly holds records if the values of dimensions in {@link #partitionDimensions}
* are of {@code partitionDimensionsValues}
*
* @param partitionDimensionsValues An instance of values of dimensions in {@link #partitionDimensions}
*
* @return Whether the current segment possibly holds records for the given values of partition dimensions
*/
private boolean isInChunk(Map<String, String> partitionDimensionsValues)
{
assert !partitionDimensions.isEmpty();
List<Object> groupKey = Lists.transform(
partitionDimensions,
o -> Collections.singletonList(partitionDimensionsValues.get(o))
);
try {
return getBucketIndex(hash(jsonMapper, groupKey), numBuckets) == bucketId % numBuckets;
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
/**
* This method calculates the hash based on whether {@param partitionDimensions} is null or not.
* If yes, then both {@param timestamp} and dimension columns in {@param inputRow} are used {@link Rows#toGroupKey}
* Or else, columns in {@param partitionDimensions} are used
*
* @param timestamp should be bucketed with query granularity
* @param inputRow row from input data
*
* @return hash value
*/
protected int hash(long timestamp, InputRow inputRow)
{
return hash(jsonMapper, partitionDimensions, timestamp, inputRow);
}
public static int hash(ObjectMapper jsonMapper, List<String> partitionDimensions, long timestamp, InputRow inputRow)
{
final List<Object> groupKey = getGroupKey(partitionDimensions, timestamp, inputRow);
try {
return hash(jsonMapper, groupKey);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)
{
if (partitionDimensions.isEmpty()) {
return Rows.toGroupKey(timestamp, inputRow);
} else {
return Lists.transform(partitionDimensions, inputRow::getDimension);
}
}
@VisibleForTesting
public static int hash(ObjectMapper jsonMapper, List<Object> objects) throws JsonProcessingException
{
return HASH_FUNCTION.hashBytes(jsonMapper.writeValueAsBytes(objects)).asInt();
}
@Override
public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
{
return createHashLookup(jsonMapper, partitionDimensions, shardSpecs, numBuckets);
}
static ShardSpecLookup createHashLookup(
ObjectMapper jsonMapper,
List<String> partitionDimensions,
List<? extends ShardSpec> shardSpecs,
int numBuckets
)
{
return (long timestamp, InputRow row) -> {
int index = getBucketIndex(hash(jsonMapper, partitionDimensions, timestamp, row), numBuckets);
return shardSpecs.get(index);
};
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
// partitionFunction can be null when you read a shardSpec of a segment created in an old version of Druid.
// The current version of Druid will always specify a partitionFunction on newly created segments.
if (partitionFunction == null) {
throw new ISE("Cannot create a hashPartitioner since partitionFunction is null");
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
HashBasedNumberedShardSpec that = (HashBasedNumberedShardSpec) o;
return bucketId == that.bucketId &&
numBuckets == that.numBuckets &&
Objects.equals(partitionDimensions, that.partitionDimensions);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), bucketId, numBuckets, partitionDimensions);
}
@Override
public String toString()
{
return "HashBasedNumberedShardSpec{" +
"partitionNum=" + getPartitionNum() +
", partitions=" + getNumCorePartitions() +
", bucketId=" + bucketId +
", numBuckets=" + numBuckets +
", partitionDimensions=" + partitionDimensions +
'}';
return new HashPartitioner(
jsonMapper,
partitionFunction,
partitionDimensions,
numBuckets
).createHashLookup(shardSpecs);
}
@Override
public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
{
// partitionFunction should be used instead of HashPartitioner at query time.
// We should process all segments if partitionFunction is null because we don't know what hash function
// was used to create segments at ingestion time.
if (partitionFunction == null) {
return true;
}
// If no partitionDimensions are specified during ingestion, hash is based on all dimensions plus the truncated
// input timestamp according to QueryGranularity instead of just partitionDimensions. Since we don't store in shard
// specs the truncated timestamps of the events that fall into the shard after ingestion, there's no way to recover
@ -260,35 +173,36 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
}
}
return !domainSet.isEmpty() && chunkPossibleInDomain(domainSet, new HashMap<>());
return !domainSet.isEmpty() && chunkPossibleInDomain(partitionFunction, domainSet, new HashMap<>());
}
/**
* Recursively enumerate all possible combinations of values for dimensions in {@link #partitionDimensions} based on
* {@code domainSet}, test if any combination matches the current segment
*
* @param hashPartitionFunction hash function used to create segments at ingestion time
* @param domainSet The set where values of dimensions in {@link #partitionDimensions} are
* drawn from
* @param partitionDimensionsValues A map from dimensions in {@link #partitionDimensions} to their values drawn from
* {@code domainSet}
*
* @return Whether the current segment possibly holds records for the provided domain. Return false if and only if
* none of the combinations matches this segment
*/
private boolean chunkPossibleInDomain(
HashPartitionFunction hashPartitionFunction,
Map<String, Set<String>> domainSet,
Map<String, String> partitionDimensionsValues
)
{
int curIndex = partitionDimensionsValues.size();
if (curIndex == partitionDimensions.size()) {
return isInChunk(partitionDimensionsValues);
return isInChunk(hashPartitionFunction, partitionDimensionsValues);
}
String dimension = partitionDimensions.get(curIndex);
for (String e : domainSet.get(dimension)) {
partitionDimensionsValues.put(dimension, e);
if (chunkPossibleInDomain(domainSet, partitionDimensionsValues)) {
if (chunkPossibleInDomain(hashPartitionFunction, domainSet, partitionDimensionsValues)) {
return true;
}
partitionDimensionsValues.remove(dimension);
@ -297,8 +211,73 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
return false;
}
private static int getBucketIndex(int hash, int numBuckets)
/**
* Check if the current segment possibly holds records if the values of dimensions in {@link #partitionDimensions}
* are of {@code partitionDimensionsValues}
*
* @param hashPartitionFunction hash function used to create segments at ingestion time
* @param partitionDimensionsValues An instance of values of dimensions in {@link #partitionDimensions}
*
* @return Whether the current segment possibly holds records for the given values of partition dimensions
*/
private boolean isInChunk(HashPartitionFunction hashPartitionFunction, Map<String, String> partitionDimensionsValues)
{
return Math.abs(hash % numBuckets);
assert !partitionDimensions.isEmpty();
List<Object> groupKey = Lists.transform(
partitionDimensions,
o -> Collections.singletonList(partitionDimensionsValues.get(o))
);
return hashPartitionFunction.hash(serializeGroupKey(jsonMapper, groupKey), numBuckets) == bucketId;
}
/**
* Serializes a group key into a byte array. The serialization algorithm can affect hash values of partition keys
* since {@link HashPartitionFunction#hash} takes the result of this method as its input. This means, the returned
* byte array should be backwards-compatible in cases where we need to modify this method.
*/
public static byte[] serializeGroupKey(ObjectMapper jsonMapper, List<Object> partitionKeys)
{
try {
return jsonMapper.writeValueAsBytes(partitionKeys);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
HashBasedNumberedShardSpec that = (HashBasedNumberedShardSpec) o;
return bucketId == that.bucketId &&
numBuckets == that.numBuckets &&
Objects.equals(partitionDimensions, that.partitionDimensions) &&
partitionFunction == that.partitionFunction;
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), bucketId, numBuckets, partitionDimensions, partitionFunction);
}
@Override
public String toString()
{
return "HashBasedNumberedShardSpec{" +
"bucketId=" + bucketId +
", numBuckets=" + numBuckets +
", partitionDimensions=" + partitionDimensions +
", partitionFunction=" + partitionFunction +
'}';
}
}

View File

@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputRow;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Objects;
@ -40,6 +40,7 @@ public class HashBucketShardSpec implements BucketNumberedShardSpec<BuildingHash
private final int bucketId;
private final int numBuckets;
private final List<String> partitionDimensions;
private final HashPartitionFunction partitionFunction;
private final ObjectMapper jsonMapper;
@JsonCreator
@ -47,6 +48,7 @@ public class HashBucketShardSpec implements BucketNumberedShardSpec<BuildingHash
@JsonProperty("bucketId") int bucketId,
@JsonProperty("numBuckets") int numBuckets,
@JsonProperty("partitionDimensions") List<String> partitionDimensions,
@JsonProperty("partitionFunction") HashPartitionFunction partitionFunction,
@JacksonInject ObjectMapper jsonMapper
)
{
@ -55,6 +57,7 @@ public class HashBucketShardSpec implements BucketNumberedShardSpec<BuildingHash
this.partitionDimensions = partitionDimensions == null
? HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS
: partitionDimensions;
this.partitionFunction = Preconditions.checkNotNull(partitionFunction, "partitionFunction");
this.jsonMapper = jsonMapper;
}
@ -77,23 +80,34 @@ public class HashBucketShardSpec implements BucketNumberedShardSpec<BuildingHash
return partitionDimensions;
}
@Override
public BuildingHashBasedNumberedShardSpec convert(int partitionId)
@JsonProperty
public HashPartitionFunction getPartitionFunction()
{
return new BuildingHashBasedNumberedShardSpec(partitionId, bucketId, numBuckets, partitionDimensions, jsonMapper);
return partitionFunction;
}
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
public BuildingHashBasedNumberedShardSpec convert(int partitionId)
{
// not in use
throw new UnsupportedOperationException();
return new BuildingHashBasedNumberedShardSpec(
partitionId,
bucketId,
numBuckets,
partitionDimensions,
partitionFunction,
jsonMapper
);
}
@Override
public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
{
return HashBasedNumberedShardSpec.createHashLookup(jsonMapper, partitionDimensions, shardSpecs, numBuckets);
return new HashPartitioner(
jsonMapper,
partitionFunction,
partitionDimensions,
numBuckets
).createHashLookup(shardSpecs);
}
@Override
@ -108,22 +122,24 @@ public class HashBucketShardSpec implements BucketNumberedShardSpec<BuildingHash
HashBucketShardSpec that = (HashBucketShardSpec) o;
return bucketId == that.bucketId &&
numBuckets == that.numBuckets &&
Objects.equals(partitionDimensions, that.partitionDimensions);
Objects.equals(partitionDimensions, that.partitionDimensions) &&
partitionFunction == that.partitionFunction;
}
@Override
public int hashCode()
{
return Objects.hash(bucketId, numBuckets, partitionDimensions);
return Objects.hash(bucketId, numBuckets, partitionDimensions, partitionFunction);
}
@Override
public String toString()
{
return "HashBucket{" +
", bucketId=" + bucketId +
return "HashBucketShardSpec{" +
"bucketId=" + bucketId +
", numBuckets=" + numBuckets +
", partitionDimensions=" + partitionDimensions +
", partitionFunction=" + partitionFunction +
'}';
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.hash.Hashing;
import org.apache.druid.java.util.common.StringUtils;
/**
* An enum of supported hash partition functions. This enum should be updated when we want to use a new function
* for hash partitioning. All partition functions listed in this enum must be backwards-compatible as the hash
* function should apply to all segments in the same way no matter what Druid version was used to create those segments.
* This function is a part of {@link HashBasedNumberedShardSpec} which is stored in the metadata store.
*/
public enum HashPartitionFunction
{
MURMUR3_32_ABS {
@Override
public int hash(byte[] serializedRow, int numBuckets)
{
return Math.abs(
Hashing.murmur3_32().hashBytes(serializedRow).asInt() % numBuckets
);
}
};
/**
* Returns an ID of a hash bucket for the given {@code serializedRow}.
*/
public abstract int hash(byte[] serializedRow, int numBuckets);
@JsonCreator
public static HashPartitionFunction fromString(String type)
{
return HashPartitionFunction.valueOf(StringUtils.toUpperCase(type));
}
@Override
@JsonValue
public String toString()
{
return StringUtils.toLowerCase(name());
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Rows;
import java.util.List;
/**
* This class is used for hash partitioning during ingestion. The {@link ShardSpecLookup} returned from
* {@link #createHashLookup} is used to determine what hash bucket the given input row will belong to.
*/
public class HashPartitioner
{
private final ObjectMapper jsonMapper;
private final HashPartitionFunction hashPartitionFunction;
private final List<String> partitionDimensions;
private final int numBuckets;
HashPartitioner(
final ObjectMapper jsonMapper,
final HashPartitionFunction hashPartitionFunction,
final List<String> partitionDimensions,
final int numBuckets
)
{
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
this.hashPartitionFunction = Preconditions.checkNotNull(hashPartitionFunction, "hashPartitionFunction");
this.partitionDimensions = Preconditions.checkNotNull(partitionDimensions, "partitionDimensions");
this.numBuckets = numBuckets;
}
ShardSpecLookup createHashLookup(final List<? extends ShardSpec> shardSpecs)
{
Preconditions.checkNotNull(hashPartitionFunction, "hashPartitionFunction");
return (long timestamp, InputRow row) -> {
int index = hash(timestamp, row);
return shardSpecs.get(index);
};
}
@VisibleForTesting
int hash(final long timestamp, final InputRow inputRow)
{
return hashPartitionFunction.hash(
HashBasedNumberedShardSpec.serializeGroupKey(jsonMapper, extractKeys(timestamp, inputRow)),
numBuckets
);
}
/**
* This method extracts keys for hash partitioning based on whether {@param partitionDimensions} is empty or not.
* If yes, then both {@param timestamp} and dimension values in {@param inputRow} are returned.
* Otherwise, values of {@param partitionDimensions} are returned.
*
* @param timestamp should be bucketed with query granularity
* @param inputRow row from input data
*
* @return a list of values of grouping keys
*/
@VisibleForTesting
List<Object> extractKeys(final long timestamp, final InputRow inputRow)
{
return extractKeys(partitionDimensions, timestamp, inputRow);
}
public static List<Object> extractKeys(
final List<String> partitionDimensions,
final long timestamp,
final InputRow inputRow
)
{
if (partitionDimensions.isEmpty()) {
return Rows.toGroupKey(timestamp, inputRow);
} else {
return Lists.transform(partitionDimensions, inputRow::getDimension);
}
}
}

View File

@ -79,12 +79,6 @@ public final class LinearShardSpec implements ShardSpec
return new LinearPartitionChunk<>(partitionNum, obj);
}
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
return true;
}
@Override
public boolean equals(Object o)
{

View File

@ -52,12 +52,6 @@ public class NoneShardSpec implements ShardSpec
return new SingleElementPartitionChunk<>(obj);
}
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
return true;
}
@Override
@JsonIgnore
public int getPartitionNum()

View File

@ -146,12 +146,6 @@ public class NumberedOverwriteShardSpec implements OverwriteShardSpec
return new NumberedOverwritingPartitionChunk<>(partitionId, obj);
}
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
return true;
}
@JsonProperty("partitionId")
@Override
public int getPartitionNum()

View File

@ -106,12 +106,6 @@ public class NumberedShardSpec implements ShardSpec
return NumberedPartitionChunk.make(partitionNum, partitions, obj);
}
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
return true;
}
@Override
public String toString()
{

View File

@ -22,6 +22,7 @@ package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import javax.annotation.Nullable;
import java.util.List;
@ -90,8 +91,7 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSin
return new BuildingSingleDimensionShardSpec(bucketId, dimension, start, end, partitionId);
}
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
public boolean isInChunk(InputRow inputRow)
{
return SingleDimensionShardSpec.isInChunk(dimension, start, end, inputRow);
}
@ -99,7 +99,14 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSin
@Override
public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
{
return SingleDimensionShardSpec.createLookup(shardSpecs);
return (long timestamp, InputRow row) -> {
for (ShardSpec spec : shardSpecs) {
if (((RangeBucketShardSpec) spec).isInChunk(row)) {
return spec;
}
}
throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs);
};
}
@Override

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.RangeSet;
import org.apache.druid.data.input.InputRow;
import java.util.List;
import java.util.Map;
@ -56,9 +55,6 @@ public interface ShardSpec
@JsonIgnore
<T> PartitionChunk<T> createChunk(T obj);
@JsonIgnore
boolean isInChunk(long timestamp, InputRow inputRow);
/**
* Returns the partition ID of this segment.
*/

View File

@ -21,6 +21,7 @@ package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
@ -115,7 +116,7 @@ public class SingleDimensionShardSpec implements ShardSpec
{
return (long timestamp, InputRow row) -> {
for (ShardSpec spec : shardSpecs) {
if (spec.isInChunk(timestamp, row)) {
if (((SingleDimensionShardSpec) spec).isInChunk(row)) {
return spec;
}
}
@ -164,8 +165,8 @@ public class SingleDimensionShardSpec implements ShardSpec
}
}
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
@VisibleForTesting
boolean isInChunk(InputRow inputRow)
{
return isInChunk(dimension, start, end, inputRow);
}

View File

@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.RangeSet;
import org.apache.druid.TestObjectMapper;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
@ -64,12 +63,6 @@ public class DataSegmentTest
return null;
}
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
return false;
}
@Override
public int getPartitionNum()
{

View File

@ -36,8 +36,15 @@ public class BuildingHashBasedNumberedShardSpecTest
public void testConvert()
{
Assert.assertEquals(
new HashBasedNumberedShardSpec(5, 10, 5, 12, ImmutableList.of("dim"), mapper),
new BuildingHashBasedNumberedShardSpec(5, 5, 12, ImmutableList.of("dim"), mapper).convert(10)
new HashBasedNumberedShardSpec(5, 10, 5, 12, ImmutableList.of("dim"), HashPartitionFunction.MURMUR3_32_ABS, mapper),
new BuildingHashBasedNumberedShardSpec(
5,
5,
12,
ImmutableList.of("dim"),
HashPartitionFunction.MURMUR3_32_ABS,
mapper
).convert(10)
);
}
@ -46,8 +53,14 @@ public class BuildingHashBasedNumberedShardSpecTest
{
Assert.assertEquals(
new NumberedPartitionChunk<>(5, 0, "test"),
new BuildingHashBasedNumberedShardSpec(5, 5, 12, ImmutableList.of("dim"), mapper)
.createChunk("test")
new BuildingHashBasedNumberedShardSpec(
5,
5,
12,
ImmutableList.of("dim"),
HashPartitionFunction.MURMUR3_32_ABS,
mapper
).createChunk("test")
);
}
@ -63,6 +76,7 @@ public class BuildingHashBasedNumberedShardSpecTest
5,
12,
ImmutableList.of("dim"),
HashPartitionFunction.MURMUR3_32_ABS,
mapper
);
final String json = mapper.writeValueAsString(original);

View File

@ -47,7 +47,8 @@ public class HashBasedNumberedPartialShardSpecTest
final HashBasedNumberedPartialShardSpec expected = new HashBasedNumberedPartialShardSpec(
ImmutableList.of("dim1", "dim2"),
1,
3
3,
HashPartitionFunction.MURMUR3_32_ABS
);
final byte[] json = MAPPER.writeValueAsBytes(expected);
final HashBasedNumberedPartialShardSpec fromJson = (HashBasedNumberedPartialShardSpec) MAPPER.readValue(
@ -63,17 +64,19 @@ public class HashBasedNumberedPartialShardSpecTest
final HashBasedNumberedPartialShardSpec expected = new HashBasedNumberedPartialShardSpec(
ImmutableList.of("dim1", "dim2"),
1,
3
3,
HashPartitionFunction.MURMUR3_32_ABS
);
final byte[] json = MAPPER.writeValueAsBytes(expected);
//noinspection unchecked
final Map<String, Object> map = MAPPER.readValue(json, Map.class);
Assert.assertEquals(4, map.size());
Assert.assertEquals(5, map.size());
Assert.assertEquals(HashBasedNumberedPartialShardSpec.TYPE, map.get("type"));
Assert.assertEquals(expected.getPartitionDimensions(), map.get("partitionDimensions"));
Assert.assertEquals(expected.getBucketId(), map.get("bucketId"));
Assert.assertEquals(expected.getNumBuckets(), map.get("numPartitions"));
Assert.assertEquals(expected.getBucketId(), map.get("bucketId"));
Assert.assertEquals(expected.getPartitionFunction().toString(), map.get("partitionFunction"));
}
@Test
@ -82,11 +85,12 @@ public class HashBasedNumberedPartialShardSpecTest
final HashBasedNumberedPartialShardSpec partialShardSpec = new HashBasedNumberedPartialShardSpec(
ImmutableList.of("dim"),
2,
4
4,
null
);
final ShardSpec shardSpec = partialShardSpec.complete(MAPPER, 1, 3);
Assert.assertEquals(
new HashBasedNumberedShardSpec(1, 3, 2, 4, ImmutableList.of("dim"), MAPPER),
new HashBasedNumberedShardSpec(1, 3, 2, 4, ImmutableList.of("dim"), null, MAPPER),
shardSpec
);
}

View File

@ -32,7 +32,6 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
@ -69,6 +68,7 @@ public class HashBasedNumberedShardSpecTest
1,
3,
ImmutableList.of("visitor_id"),
HashPartitionFunction.MURMUR3_32_ABS,
objectMapper
)
),
@ -79,6 +79,10 @@ public class HashBasedNumberedShardSpecTest
Assert.assertEquals(1, ((HashBasedNumberedShardSpec) spec).getBucketId());
Assert.assertEquals(3, ((HashBasedNumberedShardSpec) spec).getNumBuckets());
Assert.assertEquals(ImmutableList.of("visitor_id"), ((HashBasedNumberedShardSpec) spec).getPartitionDimensions());
Assert.assertEquals(
HashPartitionFunction.MURMUR3_32_ABS,
((HashBasedNumberedShardSpec) spec).getPartitionFunction()
);
}
@Test
@ -102,15 +106,16 @@ public class HashBasedNumberedShardSpecTest
ImmutableList.of("visitor_id"),
((HashBasedNumberedShardSpec) specWithPartitionDimensions).getPartitionDimensions()
);
Assert.assertNull(((HashBasedNumberedShardSpec) specWithPartitionDimensions).getPartitionFunction());
}
@Test
public void testPartitionChunks()
{
final List<ShardSpec> specs = ImmutableList.of(
new HashBasedNumberedShardSpec(0, 3, 0, 3, null, objectMapper),
new HashBasedNumberedShardSpec(1, 3, 1, 3, null, objectMapper),
new HashBasedNumberedShardSpec(2, 3, 2, 3, null, objectMapper)
new HashBasedNumberedShardSpec(0, 3, 0, 3, null, null, objectMapper),
new HashBasedNumberedShardSpec(1, 3, 1, 3, null, null, objectMapper),
new HashBasedNumberedShardSpec(2, 3, 2, 3, null, null, objectMapper)
);
final List<PartitionChunk<String>> chunks = Lists.transform(
@ -149,37 +154,56 @@ public class HashBasedNumberedShardSpecTest
Assert.assertFalse(chunks.get(2).abuts(chunks.get(2)));
}
private HashPartitioner createHashPartitionerForHashInputRow(int numBuckets)
{
return new HashPartitioner(
objectMapper,
HashPartitionFunction.MURMUR3_32_ABS,
ImmutableList.of(),
numBuckets
)
{
@Override
int hash(final long timestamp, final InputRow inputRow)
{
return Math.abs(inputRow.hashCode() % numBuckets);
}
};
}
@Test
public void testIsInChunk()
{
List<ShardSpec> specs = new ArrayList<>();
List<HashBasedNumberedShardSpec> specs = new ArrayList<>();
for (int i = 0; i < 3; i++) {
specs.add(new HashOverridenShardSpec(i, 3));
specs.add(newShardSpecForTesting(i, 3));
}
final HashPartitioner hashPartitioner = createHashPartitionerForHashInputRow(3);
assertExistsInOneSpec(specs, new HashInputRow(Integer.MIN_VALUE));
assertExistsInOneSpec(specs, new HashInputRow(Integer.MAX_VALUE));
assertExistsInOneSpec(specs, new HashInputRow(0));
assertExistsInOneSpec(specs, new HashInputRow(1000));
assertExistsInOneSpec(specs, new HashInputRow(-1000));
Assert.assertTrue(existsInOneSpec(specs, hashPartitioner, new HashInputRow(Integer.MIN_VALUE)));
Assert.assertTrue(existsInOneSpec(specs, hashPartitioner, new HashInputRow(Integer.MAX_VALUE)));
Assert.assertTrue(existsInOneSpec(specs, hashPartitioner, new HashInputRow(0)));
Assert.assertTrue(existsInOneSpec(specs, hashPartitioner, new HashInputRow(1000)));
Assert.assertTrue(existsInOneSpec(specs, hashPartitioner, new HashInputRow(-1000)));
}
@Test
public void testIsInChunkWithMorePartitionsBeyondNumBucketsReturningTrue()
{
final int numBuckets = 3;
final List<ShardSpec> specs = IntStream.range(0, 10)
.mapToObj(i -> new HashOverridenShardSpec(i, numBuckets))
.collect(Collectors.toList());
final List<HashBasedNumberedShardSpec> specs = IntStream.range(0, 10)
.mapToObj(i -> newShardSpecForTesting(i, numBuckets))
.collect(Collectors.toList());
final HashPartitioner hashPartitioner = createHashPartitionerForHashInputRow(numBuckets);
for (int i = 0; i < 10; i++) {
final InputRow row = new HashInputRow(numBuckets * 10000 + i);
Assert.assertTrue(specs.get(i).isInChunk(row.getTimestampFromEpoch(), row));
Assert.assertTrue(isInChunk(specs.get(i), hashPartitioner, row.getTimestampFromEpoch(), row));
}
}
@Test
public void testGetGroupKey()
public void testExtractKeys()
{
final List<String> partitionDimensions1 = ImmutableList.of("visitor_id");
final DateTime time = DateTimes.nowUtc();
@ -190,16 +214,26 @@ public class HashBasedNumberedShardSpecTest
);
Assert.assertEquals(
ImmutableList.of(Collections.singletonList("v1")),
HashBasedNumberedShardSpec.getGroupKey(partitionDimensions1, time.getMillis(), inputRow)
new HashPartitioner(
objectMapper,
HashPartitionFunction.MURMUR3_32_ABS,
partitionDimensions1,
0 // not used
).extractKeys(time.getMillis(), inputRow)
);
Assert.assertEquals(
ImmutableList.of(
time.getMillis(),
ImmutableMap.of("cnt", Collections.singletonList(10), "visitor_id", Collections.singletonList("v1")))
.toString(),
time.getMillis(),
ImmutableMap.of("cnt", Collections.singletonList(10), "visitor_id", Collections.singletonList("v1"))
).toString(),
// empty list when partitionDimensions is null
HashBasedNumberedShardSpec.getGroupKey(ImmutableList.of(), time.getMillis(), inputRow).toString()
new HashPartitioner(
objectMapper,
HashPartitionFunction.MURMUR3_32_ABS,
ImmutableList.of(),
0 // not used
).extractKeys(time.getMillis(), inputRow).toString()
);
}
@ -212,89 +246,150 @@ public class HashBasedNumberedShardSpecTest
1,
3,
ImmutableList.of("visitor_id"),
null,
objectMapper
);
Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance()));
Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1)));
Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1, null)));
Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1)));
Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1)));
}
@Test
public void testPossibleInDomain()
public void testPossibleInDomainWithNullHashPartitionFunctionReturnAll()
{
final RangeSet<String> rangeSet = TreeRangeSet.create();
rangeSet.add(Range.closed("123", "123"));
final Map<String, RangeSet<String>> domain = ImmutableMap.of("visitor_id", rangeSet);
// Without partition info
HashBasedNumberedShardSpec shardSpec = new HashBasedNumberedShardSpec(
0,
1,
0,
1,
ImmutableList.of(),
objectMapper
);
Assert.assertTrue(shardSpec.possibleInDomain(domain));
// With partition info and matching partition dimensions
final int numBuckets = 3;
List<HashBasedNumberedShardSpec> shardSpecs = ImmutableList.of(
new HashBasedNumberedShardSpec(
0,
numBuckets,
0,
numBuckets,
ImmutableList.of("visitor_id"),
objectMapper
),
new HashBasedNumberedShardSpec(
1,
numBuckets,
1,
numBuckets,
ImmutableList.of("visitor_id"),
objectMapper
),
new HashBasedNumberedShardSpec(
2,
numBuckets,
2,
numBuckets,
ImmutableList.of("visitor_id"),
objectMapper
)
);
Assert.assertEquals(1, shardSpecs.stream().filter(s -> s.possibleInDomain(domain)).count());
final List<HashBasedNumberedShardSpec> shardSpecs = new ArrayList<>();
for (int i = 0; i < numBuckets; i++) {
shardSpecs.add(
new HashBasedNumberedShardSpec(
i,
numBuckets,
i,
numBuckets,
ImmutableList.of("visitor_id"),
null,
objectMapper
)
);
}
Assert.assertEquals(numBuckets, shardSpecs.stream().filter(s -> s.possibleInDomain(domain)).count());
}
// Partition dimensions not match
@Test
public void testPossibleInDomainWithoutPartitionDimensionsReturnAll()
{
final RangeSet<String> rangeSet = TreeRangeSet.create();
rangeSet.add(Range.closed("123", "123"));
final Map<String, RangeSet<String>> domain = ImmutableMap.of("visitor_id", rangeSet);
final int numBuckets = 3;
final List<HashBasedNumberedShardSpec> shardSpecs = new ArrayList<>();
for (int i = 0; i < numBuckets; i++) {
shardSpecs.add(
new HashBasedNumberedShardSpec(
i,
numBuckets,
i,
numBuckets,
ImmutableList.of(),
HashPartitionFunction.MURMUR3_32_ABS,
objectMapper
)
);
}
Assert.assertEquals(numBuckets, shardSpecs.stream().filter(s -> s.possibleInDomain(domain)).count());
}
@Test
public void testPossibleInDomainFilterOnPartitionDimensionsReturnPrunedShards()
{
final RangeSet<String> rangeSet = TreeRangeSet.create();
rangeSet.add(Range.closed("123", "123"));
final Map<String, RangeSet<String>> domain = ImmutableMap.of("visitor_id", rangeSet);
final int numBuckets = 3;
final List<HashBasedNumberedShardSpec> shardSpecs = new ArrayList<>();
for (int i = 0; i < numBuckets; i++) {
shardSpecs.add(
new HashBasedNumberedShardSpec(
i,
numBuckets,
i,
numBuckets,
ImmutableList.of("visitor_id"),
HashPartitionFunction.MURMUR3_32_ABS,
objectMapper
)
);
}
Assert.assertEquals(1, shardSpecs.stream().filter(s -> s.possibleInDomain(domain)).count());
}
@Test
public void testPossibleInDomainFilterOnNonPartitionDimensionsReturnAll()
{
final RangeSet<String> rangeSet = TreeRangeSet.create();
rangeSet.add(Range.closed("123", "123"));
final Map<String, RangeSet<String>> domain1 = ImmutableMap.of("vistor_id_1", rangeSet);
final int numBuckets = 3;
final List<HashBasedNumberedShardSpec> shardSpecs = new ArrayList<>();
for (int i = 0; i < numBuckets; i++) {
shardSpecs.add(
new HashBasedNumberedShardSpec(
i,
numBuckets,
i,
numBuckets,
ImmutableList.of("visitor_id"),
HashPartitionFunction.MURMUR3_32_ABS,
objectMapper
)
);
}
Assert.assertEquals(shardSpecs.size(), shardSpecs.stream().filter(s -> s.possibleInDomain(domain1)).count());
}
public boolean assertExistsInOneSpec(List<ShardSpec> specs, InputRow row)
public boolean existsInOneSpec(
List<? extends HashBasedNumberedShardSpec> specs,
HashPartitioner hashPartitioner,
InputRow row
)
{
for (ShardSpec spec : specs) {
if (spec.isInChunk(row.getTimestampFromEpoch(), row)) {
for (HashBasedNumberedShardSpec spec : specs) {
if (isInChunk(spec, hashPartitioner, row.getTimestampFromEpoch(), row)) {
return true;
}
}
throw new ISE("None of the partition matches");
return false;
}
public class HashOverridenShardSpec extends HashBasedNumberedShardSpec
private boolean isInChunk(
HashBasedNumberedShardSpec shardSpec,
HashPartitioner hashPartitioner,
long timestamp,
InputRow inputRow
)
{
public HashOverridenShardSpec(int partitionNum, int partitions)
{
super(partitionNum, partitions, partitionNum, partitions, null, objectMapper);
}
final int bucketId = hashPartitioner.hash(timestamp, inputRow);
return bucketId == shardSpec.getBucketId();
}
@Override
protected int hash(long timestamp, InputRow inputRow)
{
return inputRow.hashCode();
}
private HashBasedNumberedShardSpec newShardSpecForTesting(int partitionNum, int partitions)
{
return new HashBasedNumberedShardSpec(
partitionNum,
partitions,
partitionNum % partitions,
partitions,
null,
null,
objectMapper
);
}
public static class HashInputRow implements InputRow

View File

@ -36,13 +36,20 @@ import java.util.List;
public class HashBucketShardSpecTest
{
private final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
@Test
public void testConvert()
{
Assert.assertEquals(
new BuildingHashBasedNumberedShardSpec(3, 5, 12, ImmutableList.of("dim"), mapper),
new HashBucketShardSpec(5, 12, ImmutableList.of("dim"), mapper).convert(3)
new BuildingHashBasedNumberedShardSpec(
3,
5,
12,
ImmutableList.of("dim"),
HashPartitionFunction.MURMUR3_32_ABS,
mapper
),
new HashBucketShardSpec(5, 12, ImmutableList.of("dim"), HashPartitionFunction.MURMUR3_32_ABS, mapper).convert(3)
);
}
@ -51,7 +58,13 @@ public class HashBucketShardSpecTest
{
Assert.assertEquals(
new NumberedPartitionChunk<>(5, 0, "test"),
new HashBucketShardSpec(5, 12, ImmutableList.of("dim"), mapper).createChunk("test")
new HashBucketShardSpec(
5,
12,
ImmutableList.of("dim"),
HashPartitionFunction.MURMUR3_32_ABS,
mapper
).createChunk("test")
);
}
@ -59,9 +72,9 @@ public class HashBucketShardSpecTest
public void testShardSpecLookup()
{
final List<ShardSpec> shardSpecs = ImmutableList.of(
new HashBucketShardSpec(0, 3, ImmutableList.of("dim"), mapper),
new HashBucketShardSpec(1, 3, ImmutableList.of("dim"), mapper),
new HashBucketShardSpec(2, 3, ImmutableList.of("dim"), mapper)
new HashBucketShardSpec(0, 3, ImmutableList.of("dim"), HashPartitionFunction.MURMUR3_32_ABS, mapper),
new HashBucketShardSpec(1, 3, ImmutableList.of("dim"), HashPartitionFunction.MURMUR3_32_ABS, mapper),
new HashBucketShardSpec(2, 3, ImmutableList.of("dim"), HashPartitionFunction.MURMUR3_32_ABS, mapper)
);
final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs);
final long currentTime = DateTimes.nowUtc().getMillis();
@ -103,7 +116,13 @@ public class HashBucketShardSpecTest
mapper.registerSubtypes(new NamedType(HashBucketShardSpec.class, HashBucketShardSpec.TYPE));
mapper.setInjectableValues(new Std().addValue(ObjectMapper.class, mapper));
final HashBucketShardSpec original = new HashBucketShardSpec(5, 12, ImmutableList.of("dim"), mapper);
final HashBucketShardSpec original = new HashBucketShardSpec(
5,
12,
ImmutableList.of("dim"),
HashPartitionFunction.MURMUR3_32_ABS,
mapper
);
final String json = mapper.writeValueAsString(original);
final HashBucketShardSpec fromJson = (HashBucketShardSpec) mapper.readValue(json, ShardSpec.class);
Assert.assertEquals(original, fromJson);

View File

@ -62,7 +62,7 @@ public class NumberedOverwriteShardSpecTest
(short) 1
);
Assert.assertFalse(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance()));
Assert.assertFalse(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1)));
Assert.assertFalse(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1, null)));
Assert.assertFalse(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1)));
Assert.assertTrue(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1)));
}

View File

@ -198,7 +198,7 @@ public class NumberedShardSpecTest
{
final NumberedShardSpec shardSpec = new NumberedShardSpec(0, 1);
Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance()));
Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1)));
Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1, null)));
Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1)));
Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1)));
}

View File

@ -47,9 +47,9 @@ public class PartitionHolderCompletenessTest
new Object[]{
// Simulate empty hash buckets
ImmutableList.of(
new HashBasedNumberedShardSpec(2, 3, 3, 5, null, new ObjectMapper()),
new HashBasedNumberedShardSpec(0, 3, 0, 5, null, new ObjectMapper()),
new HashBasedNumberedShardSpec(1, 3, 2, 5, null, new ObjectMapper())
new HashBasedNumberedShardSpec(2, 3, 3, 5, null, null, new ObjectMapper()),
new HashBasedNumberedShardSpec(0, 3, 0, 5, null, null, new ObjectMapper()),
new HashBasedNumberedShardSpec(1, 3, 2, 5, null, null, new ObjectMapper())
),
HashBasedNumberedShardSpec.class.getSimpleName()
},

View File

@ -105,7 +105,7 @@ public class SingleDimensionShardSpecTest
ImmutableList.of("billy"),
Maps.transformValues(pair.rhs, input -> input)
);
Assert.assertEquals(StringUtils.format("spec[%s], row[%s]", spec, inputRow), pair.lhs, spec.isInChunk(inputRow.getTimestampFromEpoch(), inputRow));
Assert.assertEquals(StringUtils.format("spec[%s], row[%s]", spec, inputRow), pair.lhs, spec.isInChunk(inputRow));
}
}
}
@ -145,7 +145,7 @@ public class SingleDimensionShardSpecTest
{
final SingleDimensionShardSpec shardSpec = makeSpec("start", "end");
Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance()));
Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1)));
Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1, null)));
Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1)));
Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1)));
}

View File

@ -387,7 +387,18 @@ The configuration options are:
|maxRowsPerSegment|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`|
|numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or `maxRowsPerSegment`|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `targetRowsPerSegment` is set.|no|
|partitionFunction|A function to compute hash of partition dimensions. See [Hash partition function](#hash-partition-function)|`murmur3_32_abs`|no|
##### Hash partition function
In hash partitioning, the partition function is used to compute hash of partition dimensions. The partition dimension
values are first serialized into a byte array as a whole, and then the partition function is applied to compute hash of
the byte array.
Druid currently supports only one partition function.
|name|description|
|----|-----------|
|`murmur3_32_abs`|Applies an absolute value function to the result of [`murmur3_32`](https://guava.dev/releases/16.0/api/docs/com/google/common/hash/Hashing.html#murmur3_32()).|
### Single-dimension range partitioning

View File

@ -90,7 +90,7 @@ This table compares the three available options:
| **Input locations** | Any [`inputSource`](./native-batch.md#input-sources). | Any Hadoop FileSystem or Druid datasource. | Any [`inputSource`](./native-batch.md#input-sources). |
| **File formats** | Any [`inputFormat`](./data-formats.md#input-format). | Any Hadoop InputFormat. | Any [`inputFormat`](./data-formats.md#input-format). |
| **[Rollup modes](#rollup)** | Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). | Always perfect. | Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). |
| **Partitioning options** | Dynamic, hash-based, and range-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec) for details. | Hash-based or range-based partitioning via [`partitionsSpec`](hadoop.md#partitionsspec). | Dynamic and hash-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec) for details. |
| **Partitioning options** | Dynamic, hash-based, and range-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec) for details. | Hash-based or range-based partitioning via [`partitionsSpec`](hadoop.md#partitionsspec). | Dynamic and hash-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec-1) for details. |
<a name="data-model"></a>

View File

@ -257,10 +257,10 @@ For perfect rollup, you should use either `hashed` (partitioning based on the ha
The three `partitionsSpec` types have different characteristics.
| PartitionsSpec | Ingestion speed | Partitioning method | Supported rollup mode | Segment pruning at query time |
| PartitionsSpec | Ingestion speed | Partitioning method | Supported rollup mode | Secondary partition pruning at query time |
|----------------|-----------------|---------------------|-----------------------|-------------------------------|
| `dynamic` | Fastest | Partitioning based on number of rows in segment. | Best-effort rollup | N/A |
| `hashed` | Moderate | Partitioning based on the hash value of partition dimensions. This partitioning may reduce your datasource size and query latency by improving data locality. See [Partitioning](./index.md#partitioning) for more details. | Perfect rollup | The broker can use the partition information to prune segments early to speed up queries if `partitionDimensions` is explicitly specified during ingestion. Since the broker knows how to hash `partitionDimensions` values to locate a segment, given a query including a filter on all the `partitionDimensions`, the broker can pick up only the segments holding the rows satisfying the filter on `partitionDimensions` for query processing. |
| `hashed` | Moderate | Partitioning based on the hash value of partition dimensions. This partitioning may reduce your datasource size and query latency by improving data locality. See [Partitioning](./index.md#partitioning) for more details. | Perfect rollup | The broker can use the partition information to prune segments early to speed up queries. Since the broker knows how to hash `partitionDimensions` values to locate a segment, given a query including a filter on all the `partitionDimensions`, the broker can pick up only the segments holding the rows satisfying the filter on `partitionDimensions` for query processing.<br/><br/>Note that `partitionDimensions` must be set at ingestion time to enable secondary partition pruning at query time.|
| `single_dim` | Slowest | Range partitioning based on the value of the partition dimension. Segment sizes may be skewed depending on the partition key distribution. This may reduce your datasource size and query latency by improving data locality. See [Partitioning](./index.md#partitioning) for more details. | Perfect rollup | The broker can use the partition information to prune segments early to speed up queries. Since the broker knows the range of `partitionDimension` values in each segment, given a query including a filter on the `partitionDimension`, the broker can pick up only the segments holding the rows satisfying the filter on `partitionDimension` for query processing. |
The recommended use case for each partitionsSpec is:
@ -294,9 +294,10 @@ How the worker task creates segments is:
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `hashed`|none|yes|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. This property and `targetRowsPerSegment` cannot both be set.|null|no|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. This property and `targetRowsPerSegment` cannot both be set.|none|no|
|targetRowsPerSegment|A target row count for each partition. If `numShards` is left unspecified, the Parallel task will determine a partition count automatically such that each partition has a row count close to the target, assuming evenly distributed keys in the input data. A target per-segment row count of 5 million is used if both `numShards` and `targetRowsPerSegment` are null. |null (or 5,000,000 if both `numShards` and `targetRowsPerSegment` are null)|no|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
|partitionFunction|A function to compute hash of partition dimensions. See [Hash partition function](#hash-partition-function)|`murmur3_32_abs`|no|
The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
The task runs in up to 3 phases: `partial dimension cardinality`, `partial segment generation` and `partial segment merge`.
@ -320,10 +321,21 @@ the time chunk and the hash value of `partitionDimensions` to be merged; each wo
falling in the same time chunk and the same hash value from multiple MiddleManager/Indexer processes and merges
them to create the final segments. Finally, they push the final segments to the deep storage at once.
##### Hash partition function
In hash partitioning, the partition function is used to compute hash of partition dimensions. The partition dimension
values are first serialized into a byte array as a whole, and then the partition function is applied to compute hash of
the byte array.
Druid currently supports only one partition function.
|name|description|
|----|-----------|
|`murmur3_32_abs`|Applies an absolute value function to the result of [`murmur3_32`](https://guava.dev/releases/16.0/api/docs/com/google/common/hash/Hashing.html#murmur3_32()).|
#### Single-dimension range partitioning
> Single dimension range partitioning is currently not supported in the sequential mode of the Parallel task.
Try set `maxNumConcurrentSubTasks` to larger than 1 to use this partitioning.
The Parallel task will use one subtask when you set `maxNumConcurrentSubTasks` to 1.
|property|description|default|required?|
|--------|-----------|-------|---------|
@ -744,6 +756,7 @@ For perfect rollup, you should use `hashed`.
|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
|partitionFunction|A function to compute hash of partition dimensions. See [Hash partition function](#hash-partition-function)|`murmur3_32_abs`|no|
For best-effort rollup, you should use `dynamic`.

View File

@ -58,6 +58,7 @@ These parameters apply to all query types.
|parallelMergeInitialYieldRows|`druid.processing.merge.task.initialYieldNumRows`|Number of rows to yield per ForkJoinPool merge task for parallel result merging on the Broker, before forking off a new task to continue merging sequences. See [Broker configuration](../configuration/index.html#broker) for more details.|
|parallelMergeSmallBatchRows|`druid.processing.merge.task.smallBatchNumRows`|Size of result batches to operate on in ForkJoinPool merge tasks for parallel result merging on the Broker. See [Broker configuration](../configuration/index.html#broker) for more details.|
|useFilterCNF|`false`| If true, Druid will attempt to convert the query filter to Conjunctive Normal Form (CNF). During query processing, columns can be pre-filtered by intersecting the bitmap indexes of all values that match the eligible filters, often greatly reducing the raw number of rows which need to be scanned. But this effect only happens for the top level filter, or individual clauses of a top level 'and' filter. As such, filters in CNF potentially have a higher chance to utilize a large amount of bitmap indexes on string columns during pre-filtering. However, this setting should be used with great caution, as it can sometimes have a negative effect on performance, and in some cases, the act of computing CNF of a filter can be expensive. We recommend hand tuning your filters to produce an optimal form if possible, or at least verifying through experimentation that using this parameter actually improves your query performance with no ill-effects.|
|secondaryPartitionPruning|`true`|Enable secondary partition pruning on the Broker. The Broker will always prune unnecessary segments from the input scan based on a filter on time intervals, but if the data is further partitioned with hash or range partitioning, this option will enable additional pruning based on a filter on secondary partition dimensions.|
## Query-type-specific parameters

View File

@ -140,7 +140,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
),
@ -151,7 +151,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
),
@ -162,7 +162,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)
@ -175,7 +175,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
),
@ -186,7 +186,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)
@ -209,7 +209,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)
@ -225,7 +225,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)
@ -246,7 +246,7 @@ public class MaterializedViewSupervisorTest
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)

View File

@ -197,6 +197,11 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -29,6 +29,8 @@ import com.google.common.io.Closeables;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Rows;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -36,6 +38,7 @@ import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -169,6 +172,15 @@ public class DetermineHashedPartitionsJob implements Jobby
log.info("Determined Intervals for Job [%s].", config.getSegmentGranularIntervals());
}
Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>(DateTimeComparator.getInstance());
PartitionsSpec partitionsSpec = config.getPartitionsSpec();
if (!(partitionsSpec instanceof HashedPartitionsSpec)) {
throw new ISE(
"%s is expected, but got %s",
HashedPartitionsSpec.class.getName(),
partitionsSpec.getClass().getName()
);
}
HashPartitionFunction partitionFunction = ((HashedPartitionsSpec) partitionsSpec).getPartitionFunction();
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart();
@ -199,6 +211,7 @@ public class DetermineHashedPartitionsJob implements Jobby
i,
numberOfShards,
null,
partitionFunction,
HadoopDruidIndexerConfig.JSON_MAPPER
),
shardCount++

View File

@ -27,6 +27,7 @@ import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -61,13 +62,16 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
} else {
final PartitionsSpec partitionsSpec = config.getPartitionsSpec();
final int shardsPerInterval;
final HashPartitionFunction partitionFunction;
if (partitionsSpec instanceof HashedPartitionsSpec) {
final HashedPartitionsSpec hashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec;
shardsPerInterval = PartitionsSpec.isEffectivelyNull(hashedPartitionsSpec.getNumShards())
? 1
: hashedPartitionsSpec.getNumShards();
partitionFunction = hashedPartitionsSpec.getPartitionFunction();
} else {
shardsPerInterval = 1;
partitionFunction = null;
}
Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>();
int shardCount = 0;
@ -84,6 +88,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
i,
shardsPerInterval,
config.getPartitionsSpec().getPartitionDimensions(),
partitionFunction,
HadoopDruidIndexerConfig.JSON_MAPPER
),
shardCount++

View File

@ -55,6 +55,7 @@ import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
@ -489,7 +490,15 @@ public class BatchDeltaIngestionTest
INTERVAL_FULL.getStartMillis(),
ImmutableList.of(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, HadoopDruidIndexerConfig.JSON_MAPPER),
new HashBasedNumberedShardSpec(
0,
1,
0,
1,
null,
HashPartitionFunction.MURMUR3_32_ABS,
HadoopDruidIndexerConfig.JSON_MAPPER
),
0
)
)

View File

@ -36,6 +36,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
@ -43,6 +45,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
@ -78,7 +81,8 @@ public class DetermineHashedPartitionsJobTest
0,
1,
first,
Granularities.DAY
Granularities.DAY,
null
},
{
DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(),
@ -87,7 +91,8 @@ public class DetermineHashedPartitionsJobTest
0,
6,
second,
Granularities.DAY
Granularities.DAY,
null
},
{
DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(),
@ -96,7 +101,8 @@ public class DetermineHashedPartitionsJobTest
0,
6,
third,
Granularities.DAY
Granularities.DAY,
null
},
{
DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(),
@ -105,7 +111,18 @@ public class DetermineHashedPartitionsJobTest
0,
6,
third,
Granularities.DAY
Granularities.DAY,
null
},
{
DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(),
1,
null,
0,
6,
third,
Granularities.DAY,
HashPartitionFunction.MURMUR3_32_ABS
},
{
DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.rows.in.timezone.tsv").getPath(),
@ -114,7 +131,8 @@ public class DetermineHashedPartitionsJobTest
0,
1,
first,
new PeriodGranularity(new Period("P1D"), null, DateTimes.inferTzFromString("America/Los_Angeles"))
new PeriodGranularity(new Period("P1D"), null, DateTimes.inferTzFromString("America/Los_Angeles")),
null
}
}
);
@ -127,7 +145,8 @@ public class DetermineHashedPartitionsJobTest
int errorMargin,
int expectedNumTimeBuckets,
int[] expectedNumOfShards,
Granularity segmentGranularity
Granularity segmentGranularity,
@Nullable HashPartitionFunction partitionFunction
)
{
this.expectedNumOfShards = expectedNumOfShards;
@ -194,7 +213,7 @@ public class DetermineHashedPartitionsJobTest
new HadoopTuningConfig(
tmpDir.getAbsolutePath(),
null,
new HashedPartitionsSpec(targetPartitionSize, null, null),
new HashedPartitionsSpec(targetPartitionSize, null, null, partitionFunction),
null,
null,
null,
@ -226,6 +245,8 @@ public class DetermineHashedPartitionsJobTest
{
DetermineHashedPartitionsJob determineHashedPartitionsJob = new DetermineHashedPartitionsJob(indexerConfig);
determineHashedPartitionsJob.run();
HashPartitionFunction expectedFunction = ((HashedPartitionsSpec) indexerConfig.getPartitionsSpec())
.getPartitionFunction();
Map<Long, List<HadoopyShardSpec>> shardSpecs = indexerConfig.getSchema().getTuningConfig().getShardSpecs();
Assert.assertEquals(
expectedNumTimeBuckets,
@ -238,6 +259,10 @@ public class DetermineHashedPartitionsJobTest
entry.getValue().size(),
errorMargin
);
for (HadoopyShardSpec eachShardSpec : entry.getValue()) {
final HashBasedNumberedShardSpec hashShardSpec = (HashBasedNumberedShardSpec) eachShardSpec.getActualSpec();
Assert.assertEquals(expectedFunction, hashShardSpec.getPartitionFunction());
}
}
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class HadoopDruidDetermineConfigurationJobTest
{
@Test
public void testRunWithHashedPartitionsSpecCreateHashBasedNumberedShardSpecWithHashPartitionFunction()
{
final Set<Interval> intervals = ImmutableSet.of(
Intervals.of("2020-01-01/P1D"),
Intervals.of("2020-01-02/P1D"),
Intervals.of("2020-01-03/P1D")
);
final HashedPartitionsSpec partitionsSpec = new HashedPartitionsSpec(
null,
2,
null,
HashPartitionFunction.MURMUR3_32_ABS,
null,
null
);
final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class);
Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals));
final ArgumentCaptor<Map<Long, List<HadoopyShardSpec>>> resultCaptor = ArgumentCaptor.forClass(Map.class);
Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture());
final HadoopDruidDetermineConfigurationJob job = new HadoopDruidDetermineConfigurationJob(config);
Assert.assertTrue(job.run());
final Map<Long, List<HadoopyShardSpec>> shardSpecs = resultCaptor.getValue();
Assert.assertEquals(3, shardSpecs.size());
for (Interval interval : intervals) {
final List<HadoopyShardSpec> shardSpecsPerInterval = shardSpecs.get(interval.getStartMillis());
Assert.assertEquals(2, shardSpecsPerInterval.size());
for (int i = 0; i < shardSpecsPerInterval.size(); i++) {
Assert.assertEquals(
new HashBasedNumberedShardSpec(
i,
shardSpecsPerInterval.size(),
i,
shardSpecsPerInterval.size(),
null,
HashPartitionFunction.MURMUR3_32_ABS,
new ObjectMapper()
),
shardSpecsPerInterval.get(i).getActualSpec()
);
}
}
}
@Test
public void testRunWithSingleDimensionPartitionsSpecCreateHashBasedNumberedShardSpecWithoutHashPartitionFunction()
{
final Set<Interval> intervals = ImmutableSet.of(
Intervals.of("2020-01-01/P1D"),
Intervals.of("2020-01-02/P1D"),
Intervals.of("2020-01-03/P1D")
);
final SingleDimensionPartitionsSpec partitionsSpec = new SingleDimensionPartitionsSpec(1000, null, "dim", false);
final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class);
Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals));
final ArgumentCaptor<Map<Long, List<HadoopyShardSpec>>> resultCaptor = ArgumentCaptor.forClass(Map.class);
Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture());
final HadoopDruidDetermineConfigurationJob job = new HadoopDruidDetermineConfigurationJob(config);
Assert.assertTrue(job.run());
final Map<Long, List<HadoopyShardSpec>> shardSpecs = resultCaptor.getValue();
Assert.assertEquals(3, shardSpecs.size());
for (Interval interval : intervals) {
final List<HadoopyShardSpec> shardSpecsPerInterval = shardSpecs.get(interval.getStartMillis());
Assert.assertEquals(1, shardSpecsPerInterval.size());
Assert.assertEquals(
new HashBasedNumberedShardSpec(
0,
shardSpecsPerInterval.size(),
0,
shardSpecsPerInterval.size(),
ImmutableList.of("dim"),
null,
new ObjectMapper()
),
shardSpecsPerInterval.get(0).getActualSpec()
);
}
}
}

View File

@ -36,6 +36,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
import org.junit.Test;
@ -62,10 +63,20 @@ public class HadoopDruidIndexerConfigTest
List<HadoopyShardSpec> shardSpecs = new ArrayList<>();
final int partitionCount = 10;
for (int i = 0; i < partitionCount; i++) {
shardSpecs.add(new HadoopyShardSpec(
new HashBasedNumberedShardSpec(i, partitionCount, i, partitionCount, null, new DefaultObjectMapper()),
i
));
shardSpecs.add(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(
i,
partitionCount,
i,
partitionCount,
null,
HashPartitionFunction.MURMUR3_32_ABS,
new DefaultObjectMapper()
),
i
)
);
}
HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder()

View File

@ -43,6 +43,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
@ -559,14 +560,17 @@ public class IndexGeneratorJobTest
List<ShardSpec> specs = new ArrayList<>();
if ("hashed".equals(partitionType)) {
for (Integer[] shardInfo : (Integer[][]) shardInfoForEachShard) {
specs.add(new HashBasedNumberedShardSpec(
shardInfo[0],
shardInfo[1],
shardInfo[0],
shardInfo[1],
null,
HadoopDruidIndexerConfig.JSON_MAPPER
));
specs.add(
new HashBasedNumberedShardSpec(
shardInfo[0],
shardInfo[1],
shardInfo[0],
shardInfo[1],
null,
HashPartitionFunction.MURMUR3_32_ABS,
HadoopDruidIndexerConfig.JSON_MAPPER
)
);
}
} else if ("single".equals(partitionType)) {
int partitionNum = 0;

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
@ -37,6 +38,16 @@ public class HashedPartitionsSpecTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
@Test
public void testDefaultValues()
{
final HashedPartitionsSpec defaultSpec = HashedPartitionsSpec.defaultSpec();
Assert.assertEquals(Collections.emptyList(), defaultSpec.getPartitionDimensions());
Assert.assertEquals(HashPartitionFunction.MURMUR3_32_ABS, defaultSpec.getPartitionFunction());
Assert.assertNull(defaultSpec.getNumShards());
Assert.assertEquals(PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT, defaultSpec.getMaxRowsPerSegment().intValue());
}
@Test
public void havingTargetRowsPerSegmentOnly()
{

View File

@ -44,6 +44,7 @@ import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitioner;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -194,7 +195,7 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
while (inputRowIterator.hasNext()) {
InputRow inputRow = inputRowIterator.next();
//noinspection ConstantConditions (null rows are filtered out by FilteringCloseableInputRowIterator
// null rows are filtered out by FilteringCloseableInputRowIterator
DateTime timestamp = inputRow.getTimestamp();
//noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
Interval interval = granularitySpec.bucketInterval(timestamp).get();
@ -202,11 +203,9 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
HllSketch hllSketch = intervalToCardinalities.computeIfAbsent(
interval,
(intervalKey) -> {
return DimensionCardinalityReport.createHllSketchForReport();
}
(intervalKey) -> DimensionCardinalityReport.createHllSketchForReport()
);
List<Object> groupKey = HashBasedNumberedShardSpec.getGroupKey(
List<Object> groupKey = HashPartitioner.extractKeys(
partitionDimensions,
queryGranularity.bucketStart(timestamp).getMillis(),
inputRow

View File

@ -103,6 +103,7 @@ public class HashPartitionAnalysis implements CompletePartitionAnalysis<Integer,
i,
numBuckets,
partitionsSpec.getPartitionDimensions(),
partitionsSpec.getPartitionFunction(),
toolbox.getJsonMapper()
))
.collect(Collectors.toList());

View File

@ -882,14 +882,18 @@ public class SegmentAllocateActionTest
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new HashBasedNumberedShardSpec(0, 2, 0, 2, ImmutableList.of("dim1"), objectMapper))
.shardSpec(
new HashBasedNumberedShardSpec(0, 2, 0, 2, ImmutableList.of("dim1"), null, objectMapper)
)
.size(0)
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularities.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new HashBasedNumberedShardSpec(1, 2, 1, 2, ImmutableList.of("dim1"), objectMapper))
.shardSpec(
new HashBasedNumberedShardSpec(1, 2, 1, 2, ImmutableList.of("dim1"), null, objectMapper)
)
.size(0)
.build()
)
@ -903,7 +907,7 @@ public class SegmentAllocateActionTest
"seq",
null,
true,
new HashBasedNumberedPartialShardSpec(ImmutableList.of("dim1"), 1, 2),
new HashBasedNumberedPartialShardSpec(ImmutableList.of("dim1"), 1, 2, null),
lockGranularity
);
final SegmentIdWithShardSpec segmentIdentifier = action.perform(task, taskActionTestKit.getTaskActionToolbox());

View File

@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
@ -87,6 +86,7 @@ import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
@ -235,12 +235,20 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
Assert.assertEquals(2, segments.get(0).getShardSpec().getNumCorePartitions());
Assert.assertEquals(
HashPartitionFunction.MURMUR3_32_ABS,
((HashBasedNumberedShardSpec) segments.get(0).getShardSpec()).getPartitionFunction()
);
Assert.assertEquals("test", segments.get(1).getDataSource());
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval());
Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(1).getShardSpec().getClass());
Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
Assert.assertEquals(2, segments.get(1).getShardSpec().getNumCorePartitions());
Assert.assertEquals(
HashPartitionFunction.MURMUR3_32_ABS,
((HashBasedNumberedShardSpec) segments.get(1).getShardSpec()).getPartitionFunction()
);
}
@Test
@ -484,6 +492,52 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval());
Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
Assert.assertEquals(
HashPartitionFunction.MURMUR3_32_ABS,
((HashBasedNumberedShardSpec) segments.get(0).getShardSpec()).getPartitionFunction()
);
}
@Test
public void testNumShardsAndHashPartitionFunctionProvided() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,1\n");
writer.write("2014-01-01T02:00:30Z,c,1\n");
}
IndexTask indexTask = new IndexTask(
null,
null,
createDefaultIngestionSpec(
jsonMapper,
tmpDir,
null,
null,
createTuningConfigWithPartitionsSpec(
new HashedPartitionsSpec(null, 1, null, HashPartitionFunction.MURMUR3_32_ABS), true
),
false
),
null
);
final List<DataSegment> segments = runTask(indexTask).rhs;
Assert.assertEquals(1, segments.size());
Assert.assertEquals("test", segments.get(0).getDataSource());
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval());
Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
Assert.assertEquals(
HashPartitionFunction.MURMUR3_32_ABS,
((HashBasedNumberedShardSpec) segments.get(0).getShardSpec()).getPartitionFunction()
);
}
@Test
@ -520,6 +574,8 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals("test", segment.getDataSource());
Assert.assertEquals(Intervals.of("2014/P1D"), segment.getInterval());
Assert.assertEquals(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
final HashBasedNumberedShardSpec hashBasedNumberedShardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(HashPartitionFunction.MURMUR3_32_ABS, hashBasedNumberedShardSpec.getPartitionFunction());
final File segmentFile = segmentLoader.getSegmentFiles(segment);
@ -540,17 +596,15 @@ public class IndexTaskTest extends IngestionTestBase
.map(cursor -> {
final DimensionSelector selector = cursor.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("dim", "dim"));
try {
final int hash = HashBasedNumberedShardSpec.hash(
jsonMapper,
Collections.singletonList(selector.getObject())
);
cursor.advance();
return hash;
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
final int hash = HashPartitionFunction.MURMUR3_32_ABS.hash(
HashBasedNumberedShardSpec.serializeGroupKey(
jsonMapper,
Collections.singletonList(selector.getObject())
),
hashBasedNumberedShardSpec.getNumBuckets()
);
cursor.advance();
return hash;
})
.toList();

View File

@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.HashBucketShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
@ -52,8 +53,8 @@ public class ShardSpecsTest extends IngestionTestBase
@Test
public void testShardSpecSelectionWithNullPartitionDimension()
{
HashBucketShardSpec spec1 = new HashBucketShardSpec(0, 2, null, jsonMapper);
HashBucketShardSpec spec2 = new HashBucketShardSpec(1, 2, null, jsonMapper);
HashBucketShardSpec spec1 = new HashBucketShardSpec(0, 2, null, HashPartitionFunction.MURMUR3_32_ABS, jsonMapper);
HashBucketShardSpec spec2 = new HashBucketShardSpec(1, 2, null, HashPartitionFunction.MURMUR3_32_ABS, jsonMapper);
Map<Interval, List<BucketNumberedShardSpec<?>>> shardSpecMap = new HashMap<>();
shardSpecMap.put(Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"), ImmutableList.of(spec1, spec2));

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.timeline.partition.HashBucketShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -46,6 +47,7 @@ public class GenericPartitionStatTest
ParallelIndexTestingFactory.PARTITION_ID,
ParallelIndexTestingFactory.PARTITION_ID + 1,
Collections.singletonList("dim"),
HashPartitionFunction.MURMUR3_32_ABS,
new ObjectMapper()
),
ParallelIndexTestingFactory.NUM_ROWS,

View File

@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
@ -156,6 +157,17 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
assertHashedPartition(publishedSegments, expectedSegmentCount);
}
@Test
public void testRunWithHashPartitionFunction() throws Exception
{
final Set<DataSegment> publishedSegments = runTestTask(
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2"), HashPartitionFunction.MURMUR3_32_ABS),
TaskState.SUCCESS,
false
);
assertHashedPartition(publishedSegments, 2);
}
@Test
public void testAppendLinearlyPartitionedSegmensToHashPartitionedDatasourceSuccessfullyAppend()
{
@ -259,12 +271,27 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
for (List<DataSegment> segmentsInInterval : intervalToSegments.values()) {
Assert.assertEquals(expectedNumSegments, segmentsInInterval.size());
for (DataSegment segment : segmentsInInterval) {
Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
final HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec();
Assert.assertEquals(HashPartitionFunction.MURMUR3_32_ABS, shardSpec.getPartitionFunction());
List<ScanResultValue> results = querySegment(segment, ImmutableList.of("dim1", "dim2"), tempSegmentDir);
final int hash = HashBasedNumberedShardSpec.hash(getObjectMapper(), (List<Object>) results.get(0).getEvents());
final int hash = shardSpec.getPartitionFunction().hash(
HashBasedNumberedShardSpec.serializeGroupKey(
getObjectMapper(),
(List<Object>) results.get(0).getEvents()
),
shardSpec.getNumBuckets()
);
for (ScanResultValue value : results) {
Assert.assertEquals(
hash,
HashBasedNumberedShardSpec.hash(getObjectMapper(), (List<Object>) value.getEvents())
shardSpec.getPartitionFunction().hash(
HashBasedNumberedShardSpec.serializeGroupKey(
getObjectMapper(),
(List<Object>) value.getEvents()
),
shardSpec.getNumBuckets()
)
);
}
}

View File

@ -35,6 +35,7 @@ import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -132,7 +133,14 @@ public class ParallelIndexSupervisorTaskTest
false,
"subTaskId",
createInterval(id),
new BuildingHashBasedNumberedShardSpec(id, id, id + 1, null, new ObjectMapper())
new BuildingHashBasedNumberedShardSpec(
id,
id,
id + 1,
null,
HashPartitionFunction.MURMUR3_32_ABS,
new ObjectMapper()
)
);
}

View File

@ -45,6 +45,7 @@ import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -105,6 +106,7 @@ class ParallelIndexTestingFactory
PARTITION_ID,
PARTITION_ID + 1,
Collections.singletonList("dim"),
HashPartitionFunction.MURMUR3_32_ABS,
ParallelIndexTestingFactory.NESTED_OBJECT_MAPPER
);

View File

@ -88,6 +88,7 @@ public class PerfectRollupWorkerTaskTest
1,
null,
null,
null,
null
);

View File

@ -971,8 +971,8 @@ public class TaskLockboxTest
final Task task = NoopTask.create();
lockbox.add(task);
allocateSegmentsAndAssert(task, "seq", 3, new HashBasedNumberedPartialShardSpec(null, 1, 3));
allocateSegmentsAndAssert(task, "seq2", 5, new HashBasedNumberedPartialShardSpec(null, 3, 5));
allocateSegmentsAndAssert(task, "seq", 3, new HashBasedNumberedPartialShardSpec(null, 1, 3, null));
allocateSegmentsAndAssert(task, "seq2", 5, new HashBasedNumberedPartialShardSpec(null, 3, 5, null));
}
private void allocateSegmentsAndAssert(

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractITBatchIndexTest;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@ -74,6 +75,7 @@ public class ITHadoopIndexTest extends AbstractITBatchIndexTest
{new HashedPartitionsSpec(3, null, null)},
{new HashedPartitionsSpec(null, 3, ImmutableList.of("page"))},
{new HashedPartitionsSpec(null, 3, ImmutableList.of("page", "user"))},
{new HashedPartitionsSpec(null, 3, ImmutableList.of("page"), HashPartitionFunction.MURMUR3_32_ABS)},
{new SingleDimensionPartitionsSpec(1000, null, null, false)},
{new SingleDimensionPartitionsSpec(1000, null, "page", false)},
{new SingleDimensionPartitionsSpec(1000, null, null, true)},

View File

@ -26,6 +26,7 @@ import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
@ -52,7 +53,8 @@ public class ITPerfectRollupParallelIndexTest extends AbstractITBatchIndexTest
public static Object[][] resources()
{
return new Object[][]{
{new HashedPartitionsSpec(null, 2, null)},
{new HashedPartitionsSpec(null, 2, null, HashPartitionFunction.MURMUR3_32_ABS)},
{new HashedPartitionsSpec(null, 2, null, null)},
{new SingleDimensionPartitionsSpec(2, null, "namespace", false)}
};
}

View File

@ -57,6 +57,7 @@ public class QueryContexts
public static final String NUM_RETRIES_ON_MISSING_SEGMENTS_KEY = "numRetriesOnMissingSegments";
public static final String RETURN_PARTIAL_RESULTS_KEY = "returnPartialResults";
public static final String USE_CACHE_KEY = "useCache";
public static final String SECONDARY_PARTITION_PRUNING_KEY = "secondaryPartitionPruning";
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
@ -74,6 +75,7 @@ public class QueryContexts
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false;
public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE = 10000;
public static final boolean DEFAULT_USE_FILTER_CNF = false;
public static final boolean DEFAULT_SECONDARY_PARTITION_PRUNING = true;
@SuppressWarnings("unused") // Used by Jackson serialization
public enum Vectorize
@ -278,6 +280,10 @@ public class QueryContexts
return parseBoolean(query, JOIN_FILTER_REWRITE_ENABLE_KEY, DEFAULT_ENABLE_JOIN_FILTER_REWRITE);
}
public static <T> boolean isSecondaryPartitionPruningEnabled(Query<T> query)
{
return parseBoolean(query, SECONDARY_PARTITION_PRUNING_KEY, DEFAULT_SECONDARY_PARTITION_PRUNING);
}
public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
{

View File

@ -107,4 +107,28 @@ public class QueryContextsTest
QueryContexts.withMaxScatterGatherBytes(query, 100);
}
@Test
public void testDisableSegmentPruning()
{
Query<?> query = new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
false,
ImmutableMap.of(QueryContexts.SECONDARY_PARTITION_PRUNING_KEY, false)
);
Assert.assertFalse(QueryContexts.isSecondaryPartitionPruningEnabled(query));
}
@Test
public void testDefaultSegmentPruning()
{
Query<?> query = new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
false,
ImmutableMap.of()
);
Assert.assertTrue(QueryContexts.isSecondaryPartitionPruningEnabled(query));
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.inject.Inject;
@ -423,12 +424,17 @@ public class CachingClusteredClient implements QuerySegmentWalker
final Map<String, Optional<RangeSet<String>>> dimensionRangeCache = new HashMap<>();
// Filter unneeded chunks based on partition dimension
for (TimelineObjectHolder<String, ServerSelector> holder : serversLookup) {
final Set<PartitionChunk<ServerSelector>> filteredChunks = DimFilterUtils.filterShards(
query.getFilter(),
holder.getObject(),
partitionChunk -> partitionChunk.getObject().getSegment().getShardSpec(),
dimensionRangeCache
);
final Set<PartitionChunk<ServerSelector>> filteredChunks;
if (QueryContexts.isSecondaryPartitionPruningEnabled(query)) {
filteredChunks = DimFilterUtils.filterShards(
query.getFilter(),
holder.getObject(),
partitionChunk -> partitionChunk.getObject().getSegment().getShardSpec(),
dimensionRangeCache
);
} else {
filteredChunks = Sets.newHashSet(holder.getObject());
}
for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
ServerSelector server = chunk.getObject();
final SegmentDescriptor segment = new SegmentDescriptor(

View File

@ -72,6 +72,7 @@ import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChestWarehouse;
@ -106,6 +107,7 @@ import org.apache.druid.query.search.SearchQueryQueryToolChest;
import org.apache.druid.query.search.SearchResultValue;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryResultValue;
import org.apache.druid.query.timeseries.TimeseriesQuery;
@ -127,6 +129,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
@ -152,17 +155,21 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
/**
*
@ -1544,7 +1551,7 @@ public class CachingClusteredClientTest
}
@Test
public void testHashBasedPruning()
public void testHashBasedPruningQueryContextEnabledWithPartitionFunctionAndPartitionDimensionsDoSegmentPruning()
{
DimFilter filter = new AndDimFilter(
new SelectorDimFilter("dim1", "a", null),
@ -1577,23 +1584,101 @@ public class CachingClusteredClientTest
final DruidServer lastServer = servers[random.nextInt(servers.length)];
List<String> partitionDimensions1 = ImmutableList.of("dim1");
ServerSelector selector1 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 0, 6);
ServerSelector selector2 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 1, 6);
ServerSelector selector3 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 2, 6);
ServerSelector selector4 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 3, 6);
ServerSelector selector5 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 4, 6);
ServerSelector selector6 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 5, 6);
ServerSelector selector1 = makeMockHashBasedSelector(
lastServer,
partitionDimensions1,
HashPartitionFunction.MURMUR3_32_ABS,
0,
6
);
ServerSelector selector2 = makeMockHashBasedSelector(
lastServer,
partitionDimensions1,
HashPartitionFunction.MURMUR3_32_ABS,
1,
6
);
ServerSelector selector3 = makeMockHashBasedSelector(
lastServer,
partitionDimensions1,
HashPartitionFunction.MURMUR3_32_ABS,
2,
6
);
ServerSelector selector4 = makeMockHashBasedSelector(
lastServer,
partitionDimensions1,
HashPartitionFunction.MURMUR3_32_ABS,
3,
6
);
ServerSelector selector5 = makeMockHashBasedSelector(
lastServer,
partitionDimensions1,
HashPartitionFunction.MURMUR3_32_ABS,
4,
6
);
ServerSelector selector6 = makeMockHashBasedSelector(
lastServer,
partitionDimensions1,
HashPartitionFunction.MURMUR3_32_ABS,
5,
6
);
List<String> partitionDimensions2 = ImmutableList.of("dim2");
ServerSelector selector7 = makeMockHashBasedSelector(lastServer, partitionDimensions2, 0, 3);
ServerSelector selector8 = makeMockHashBasedSelector(lastServer, partitionDimensions2, 1, 3);
ServerSelector selector9 = makeMockHashBasedSelector(lastServer, partitionDimensions2, 2, 3);
ServerSelector selector7 = makeMockHashBasedSelector(
lastServer,
partitionDimensions2,
HashPartitionFunction.MURMUR3_32_ABS,
0,
3
);
ServerSelector selector8 = makeMockHashBasedSelector(
lastServer,
partitionDimensions2,
HashPartitionFunction.MURMUR3_32_ABS,
1,
3
);
ServerSelector selector9 = makeMockHashBasedSelector(
lastServer,
partitionDimensions2,
HashPartitionFunction.MURMUR3_32_ABS,
2,
3
);
List<String> partitionDimensions3 = ImmutableList.of("dim1", "dim3");
ServerSelector selector10 = makeMockHashBasedSelector(lastServer, partitionDimensions3, 0, 4);
ServerSelector selector11 = makeMockHashBasedSelector(lastServer, partitionDimensions3, 1, 4);
ServerSelector selector12 = makeMockHashBasedSelector(lastServer, partitionDimensions3, 2, 4);
ServerSelector selector13 = makeMockHashBasedSelector(lastServer, partitionDimensions3, 3, 4);
ServerSelector selector10 = makeMockHashBasedSelector(
lastServer,
partitionDimensions3,
HashPartitionFunction.MURMUR3_32_ABS,
0,
4
);
ServerSelector selector11 = makeMockHashBasedSelector(
lastServer,
partitionDimensions3,
HashPartitionFunction.MURMUR3_32_ABS,
1,
4
);
ServerSelector selector12 = makeMockHashBasedSelector(
lastServer,
partitionDimensions3,
HashPartitionFunction.MURMUR3_32_ABS,
2,
4
);
ServerSelector selector13 = makeMockHashBasedSelector(
lastServer,
partitionDimensions3,
HashPartitionFunction.MURMUR3_32_ABS,
3,
4
);
timeline.add(interval1, "v", new NumberedPartitionChunk<>(0, 6, selector1));
timeline.add(interval1, "v", new NumberedPartitionChunk<>(1, 6, selector2));
@ -1640,9 +1725,133 @@ public class CachingClusteredClientTest
Assert.assertEquals(expected, ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec());
}
@Test
public void testHashBasedPruningQueryContextDisabledNoSegmentPruning()
{
testNoSegmentPruningForHashPartitionedSegments(false, HashPartitionFunction.MURMUR3_32_ABS, false);
}
@Test
public void testHashBasedPruningWithoutPartitionFunctionNoSegmentPruning()
{
testNoSegmentPruningForHashPartitionedSegments(true, null, false);
}
@Test
public void testHashBasedPruningWithEmptyPartitionDimensionsNoSegmentPruning()
{
testNoSegmentPruningForHashPartitionedSegments(true, HashPartitionFunction.MURMUR3_32_ABS, true);
}
private void testNoSegmentPruningForHashPartitionedSegments(
boolean enableSegmentPruning,
@Nullable HashPartitionFunction partitionFunction,
boolean useEmptyPartitionDimensions
)
{
DimFilter filter = new AndDimFilter(
new SelectorDimFilter("dim1", "a", null),
new BoundDimFilter("dim2", "e", "zzz", true, true, false, null, StringComparators.LEXICOGRAPHIC),
// Equivalent filter of dim3 below is InDimFilter("dim3", Arrays.asList("c"), null)
new AndDimFilter(
new InDimFilter("dim3", Arrays.asList("a", "c", "e", "g"), null),
new BoundDimFilter("dim3", "aaa", "ddd", false, false, false, null, StringComparators.LEXICOGRAPHIC)
)
);
final Map<String, Object> context = new HashMap<>(CONTEXT);
context.put(QueryContexts.SECONDARY_PARTITION_PRUNING_KEY, enableSegmentPruning);
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.filters(filter)
.granularity(GRANULARITY)
.intervals(SEG_SPEC)
.intervals("2011-01-05/2011-01-10")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.context(context)
.randomQueryId();
TimeseriesQuery query = builder.build();
QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest());
final Interval interval1 = Intervals.of("2011-01-06/2011-01-07");
final Interval interval2 = Intervals.of("2011-01-07/2011-01-08");
final Interval interval3 = Intervals.of("2011-01-08/2011-01-09");
final DruidServer lastServer = servers[random.nextInt(servers.length)];
List<String> partitionDimensions = useEmptyPartitionDimensions ? ImmutableList.of() : ImmutableList.of("dim1");
final int numPartitions1 = 6;
for (int i = 0; i < numPartitions1; i++) {
ServerSelector selector = makeMockHashBasedSelector(
lastServer,
partitionDimensions,
partitionFunction,
i,
numPartitions1
);
timeline.add(interval1, "v", new NumberedPartitionChunk<>(i, numPartitions1, selector));
}
partitionDimensions = useEmptyPartitionDimensions ? ImmutableList.of() : ImmutableList.of("dim2");
final int numPartitions2 = 3;
for (int i = 0; i < numPartitions2; i++) {
ServerSelector selector = makeMockHashBasedSelector(
lastServer,
partitionDimensions,
partitionFunction,
i,
numPartitions2
);
timeline.add(interval2, "v", new NumberedPartitionChunk<>(i, numPartitions2, selector));
}
partitionDimensions = useEmptyPartitionDimensions ? ImmutableList.of() : ImmutableList.of("dim1", "dim3");
final int numPartitions3 = 4;
for (int i = 0; i < numPartitions3; i++) {
ServerSelector selector = makeMockHashBasedSelector(
lastServer,
partitionDimensions,
partitionFunction,
i,
numPartitions3
);
timeline.add(interval3, "v", new NumberedPartitionChunk<>(i, numPartitions3, selector));
}
final Capture<QueryPlus> capture = Capture.newInstance();
final Capture<ResponseContext> contextCap = Capture.newInstance();
QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class);
EasyMock.expect(mockRunner.run(EasyMock.capture(capture), EasyMock.capture(contextCap)))
.andReturn(Sequences.empty())
.anyTimes();
EasyMock.expect(serverView.getQueryRunner(lastServer))
.andReturn(mockRunner)
.anyTimes();
EasyMock.replay(serverView);
EasyMock.replay(mockRunner);
// Expected to read all segments
Set<SegmentDescriptor> expcetedDescriptors = new HashSet<>();
IntStream.range(0, numPartitions1).forEach(i -> expcetedDescriptors.add(new SegmentDescriptor(interval1, "v", i)));
IntStream.range(0, numPartitions2).forEach(i -> expcetedDescriptors.add(new SegmentDescriptor(interval2, "v", i)));
IntStream.range(0, numPartitions3).forEach(i -> expcetedDescriptors.add(new SegmentDescriptor(interval3, "v", i)));
runner.run(QueryPlus.wrap(query)).toList();
QuerySegmentSpec querySegmentSpec = ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec();
Assert.assertSame(MultipleSpecificSegmentSpec.class, querySegmentSpec.getClass());
final Set<SegmentDescriptor> actualDescriptors = new HashSet<>(
((MultipleSpecificSegmentSpec) querySegmentSpec).getDescriptors()
);
Assert.assertEquals(expcetedDescriptors, actualDescriptors);
}
private ServerSelector makeMockHashBasedSelector(
DruidServer server,
List<String> partitionDimensions,
@Nullable HashPartitionFunction partitionFunction,
int partitionNum,
int partitions
)
@ -1658,6 +1867,7 @@ public class CachingClusteredClientTest
partitionNum,
partitions,
partitionDimensions,
partitionFunction,
ServerTestHelper.MAPPER
),
null,

View File

@ -1079,7 +1079,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() throws IOException
{
final PartialShardSpec partialShardSpec = new HashBasedNumberedPartialShardSpec(null, 2, 5);
final PartialShardSpec partialShardSpec = new HashBasedNumberedPartialShardSpec(null, 2, 5, null);
final String dataSource = "ds";
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
@ -1150,7 +1150,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
"seq3",
null,
interval,
new HashBasedNumberedPartialShardSpec(null, 2, 3),
new HashBasedNumberedPartialShardSpec(null, 2, 3, null),
"version",
true
);

View File

@ -29,6 +29,7 @@ import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
import org.apache.druid.timeline.partition.BuildingSingleDimensionShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashBucketShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
@ -103,9 +104,36 @@ public class SegmentPublisherHelperTest
public void testAnnotateCorePartitionSetSizeForHashNumberedShardSpec()
{
final Set<DataSegment> segments = ImmutableSet.of(
newSegment(new BuildingHashBasedNumberedShardSpec(0, 0, 3, null, new ObjectMapper())),
newSegment(new BuildingHashBasedNumberedShardSpec(1, 1, 3, null, new ObjectMapper())),
newSegment(new BuildingHashBasedNumberedShardSpec(2, 2, 3, null, new ObjectMapper()))
newSegment(
new BuildingHashBasedNumberedShardSpec(
0,
0,
3,
null,
HashPartitionFunction.MURMUR3_32_ABS,
new ObjectMapper()
)
),
newSegment(
new BuildingHashBasedNumberedShardSpec(
1,
1,
3,
null,
HashPartitionFunction.MURMUR3_32_ABS,
new ObjectMapper()
)
),
newSegment(
new BuildingHashBasedNumberedShardSpec(
2,
2,
3,
null,
HashPartitionFunction.MURMUR3_32_ABS,
new ObjectMapper()
)
)
);
final Set<DataSegment> annotated = SegmentPublisherHelper.annotateShardSpec(segments);
for (DataSegment segment : annotated) {
@ -147,9 +175,9 @@ public class SegmentPublisherHelperTest
public void testAnnotateShardSpecThrowingExceptionForBucketNumberedShardSpec()
{
final Set<DataSegment> segments = ImmutableSet.of(
newSegment(new HashBucketShardSpec(0, 3, null, new ObjectMapper())),
newSegment(new HashBucketShardSpec(1, 3, null, new ObjectMapper())),
newSegment(new HashBucketShardSpec(2, 3, null, new ObjectMapper()))
newSegment(new HashBucketShardSpec(0, 3, null, HashPartitionFunction.MURMUR3_32_ABS, new ObjectMapper())),
newSegment(new HashBucketShardSpec(1, 3, null, HashPartitionFunction.MURMUR3_32_ABS, new ObjectMapper())),
newSegment(new HashBucketShardSpec(2, 3, null, HashPartitionFunction.MURMUR3_32_ABS, new ObjectMapper()))
);
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Cannot publish segments with shardSpec");

View File

@ -121,6 +121,7 @@ public class CompactSegmentsTest
bucketId,
numBuckets,
ImmutableList.of("dim"),
null,
JSON_MAPPER
)
},

View File

@ -306,6 +306,7 @@ numerics
parameterized
parseable
partitioner
partitionFunction
partitionsSpec
performant
plaintext
@ -361,6 +362,7 @@ rsync
runtime
schemas
searchable
secondaryPartitionPruning
seekable-stream
servlet
simple-client-sslcontext