From 0cc9eb4903e7bddb4c1484984bf87c8fab7648df Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 24 Sep 2020 16:32:56 -0700 Subject: [PATCH] Store hash partition function in dataSegment and allow segment pruning only when hash partition function is provided (#10288) * Store hash partition function in dataSegment and allow segment pruning only when hash partition function is provided * query context * fix tests; add more test * javadoc * docs and more tests * remove default and hadoop tests * consistent name and fix javadoc * spelling and field name * default function for partitionsSpec * other comments * address comments * fix tests and spelling * test * doc --- .../partitions/HashedPartitionsSpec.java | 31 ++- .../BuildingHashBasedNumberedShardSpec.java | 17 +- .../timeline/partition/BuildingShardSpec.java | 7 - .../HashBasedNumberedPartialShardSpec.java | 19 +- .../partition/HashBasedNumberedShardSpec.java | 249 ++++++++---------- .../partition/HashBucketShardSpec.java | 40 ++- .../partition/HashPartitionFunction.java | 62 +++++ .../timeline/partition/HashPartitioner.java | 101 +++++++ .../timeline/partition/LinearShardSpec.java | 6 - .../timeline/partition/NoneShardSpec.java | 6 - .../partition/NumberedOverwriteShardSpec.java | 6 - .../timeline/partition/NumberedShardSpec.java | 6 - .../partition/RangeBucketShardSpec.java | 13 +- .../druid/timeline/partition/ShardSpec.java | 4 - .../partition/SingleDimensionShardSpec.java | 7 +- .../druid/timeline/DataSegmentTest.java | 7 - ...uildingHashBasedNumberedShardSpecTest.java | 22 +- ...HashBasedNumberedPartialShardSpecTest.java | 14 +- .../HashBasedNumberedShardSpecTest.java | 249 ++++++++++++------ .../partition/HashBucketShardSpecTest.java | 35 ++- .../NumberedOverwriteShardSpecTest.java | 2 +- .../partition/NumberedShardSpecTest.java | 2 +- .../PartitionHolderCompletenessTest.java | 6 +- .../SingleDimensionShardSpecTest.java | 4 +- docs/ingestion/hadoop.md | 11 + docs/ingestion/index.md | 2 +- docs/ingestion/native-batch.md | 23 +- docs/querying/query-context.md | 1 + .../MaterializedViewSupervisorTest.java | 16 +- indexing-hadoop/pom.xml | 5 + .../indexer/DetermineHashedPartitionsJob.java | 13 + .../HadoopDruidDetermineConfigurationJob.java | 5 + .../indexer/BatchDeltaIngestionTest.java | 11 +- .../DetermineHashedPartitionsJobTest.java | 39 ++- ...oopDruidDetermineConfigurationJobTest.java | 127 +++++++++ .../indexer/HadoopDruidIndexerConfigTest.java | 19 +- .../druid/indexer/IndexGeneratorJobTest.java | 20 +- .../partitions/HashedPartitionsSpecTest.java | 11 + .../PartialDimensionCardinalityTask.java | 9 +- .../partition/HashPartitionAnalysis.java | 1 + .../actions/SegmentAllocateActionTest.java | 10 +- .../indexing/common/task/IndexTaskTest.java | 78 +++++- .../indexing/common/task/ShardSpecsTest.java | 5 +- .../parallel/GenericPartitionStatTest.java | 2 + ...rtitionMultiPhaseParallelIndexingTest.java | 31 ++- .../ParallelIndexSupervisorTaskTest.java | 10 +- .../parallel/ParallelIndexTestingFactory.java | 2 + .../parallel/PerfectRollupWorkerTaskTest.java | 1 + .../indexing/overlord/TaskLockboxTest.java | 4 +- .../druid/tests/hadoop/ITHadoopIndexTest.java | 2 + .../ITPerfectRollupParallelIndexTest.java | 4 +- .../org/apache/druid/query/QueryContexts.java | 6 + .../apache/druid/query/QueryContextsTest.java | 24 ++ .../druid/client/CachingClusteredClient.java | 18 +- .../client/CachingClusteredClientTest.java | 238 ++++++++++++++++- ...exerSQLMetadataStorageCoordinatorTest.java | 4 +- .../SegmentPublisherHelperTest.java | 40 ++- .../coordinator/duty/CompactSegmentsTest.java | 1 + website/.spelling | 2 + 59 files changed, 1315 insertions(+), 395 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/timeline/partition/HashPartitionFunction.java create mode 100644 core/src/main/java/org/apache/druid/timeline/partition/HashPartitioner.java create mode 100644 indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java index d78636c00bb..59d6f36a0c8 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java @@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.druid.indexer.Checks; import org.apache.druid.indexer.Property; +import org.apache.druid.timeline.partition.HashPartitionFunction; import javax.annotation.Nullable; import java.util.Collections; @@ -38,16 +39,18 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec static final String NUM_SHARDS = "numShards"; private static final String FORCE_GUARANTEED_ROLLUP_COMPATIBLE = ""; + private static final HashPartitionFunction DEFAULT_HASH_FUNCTION = HashPartitionFunction.MURMUR3_32_ABS; @Nullable private final Integer maxRowsPerSegment; @Nullable private final Integer numShards; private final List partitionDimensions; + private final HashPartitionFunction partitionFunction; public static HashedPartitionsSpec defaultSpec() { - return new HashedPartitionsSpec(null, null, null, null, null); + return new HashedPartitionsSpec(null, null, null, null, null, null); } @JsonCreator @@ -55,6 +58,7 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec @JsonProperty(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment, @JsonProperty(NUM_SHARDS) @Nullable Integer numShards, @JsonProperty("partitionDimensions") @Nullable List partitionDimensions, + @JsonProperty("partitionFunction") @Nullable HashPartitionFunction partitionFunction, // Deprecated properties preserved for backward compatibility: @Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable @@ -84,6 +88,7 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec Checks.checkAtMostOneNotNull(target, new Property<>(NUM_SHARDS, adjustedNumShards)); this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions; + this.partitionFunction = partitionFunction == null ? DEFAULT_HASH_FUNCTION : partitionFunction; this.numShards = adjustedNumShards; // Supply default for targetRowsPerSegment if needed @@ -107,13 +112,23 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec ); } + public HashedPartitionsSpec( + @Nullable Integer maxRowsPerSegment, + @Nullable Integer numShards, + @Nullable List partitionDimensions, + @Nullable HashPartitionFunction partitionFunction + ) + { + this(null, numShards, partitionDimensions, partitionFunction, maxRowsPerSegment, null); + } + public HashedPartitionsSpec( @Nullable Integer maxRowsPerSegment, @Nullable Integer numShards, @Nullable List partitionDimensions ) { - this(null, numShards, partitionDimensions, null, maxRowsPerSegment); + this(null, numShards, partitionDimensions, null, maxRowsPerSegment, null); } @Nullable @@ -157,6 +172,12 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec return partitionDimensions; } + @JsonProperty + public HashPartitionFunction getPartitionFunction() + { + return partitionFunction; + } + @Override public String getForceGuaranteedRollupIncompatiblityReason() { @@ -175,13 +196,14 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec HashedPartitionsSpec that = (HashedPartitionsSpec) o; return Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && Objects.equals(numShards, that.numShards) && - Objects.equals(partitionDimensions, that.partitionDimensions); + Objects.equals(partitionDimensions, that.partitionDimensions) && + partitionFunction == that.partitionFunction; } @Override public int hashCode() { - return Objects.hash(maxRowsPerSegment, numShards, partitionDimensions); + return Objects.hash(maxRowsPerSegment, numShards, partitionDimensions, partitionFunction); } @Override @@ -191,6 +213,7 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec "maxRowsPerSegment=" + maxRowsPerSegment + ", numShards=" + numShards + ", partitionDimensions=" + partitionDimensions + + ", partitionFunction=" + partitionFunction + '}'; } } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpec.java index fb896fc2ac8..f5c43b075b3 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import javax.annotation.Nullable; import java.util.List; @@ -41,6 +42,7 @@ public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec partitionDimensions; + private final HashPartitionFunction partitionFunction; private final ObjectMapper jsonMapper; @JsonCreator @@ -49,6 +51,7 @@ public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec partitionDimensions, + @JsonProperty("partitionFunction") HashPartitionFunction partitionFunction, @JacksonInject ObjectMapper jsonMapper ) { @@ -58,6 +61,7 @@ public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec PartitionChunk createChunk(T obj) { @@ -105,6 +115,7 @@ public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec partitionDimensions, @JsonProperty("bucketId") int bucketId, - @JsonProperty("numPartitions") int numBuckets + @JsonProperty("numPartitions") int numBuckets, + @JsonProperty("partitionFunction") @Nullable HashPartitionFunction partitionFunction // nullable for backward compatibility ) { this.partitionDimensions = partitionDimensions; this.bucketId = bucketId; this.numBuckets = numBuckets; + this.partitionFunction = partitionFunction; } @Nullable @@ -67,6 +71,13 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec return numBuckets; } + @JsonProperty + @Nullable + public HashPartitionFunction getPartitionFunction() + { + return partitionFunction; + } + @Override public ShardSpec complete(ObjectMapper objectMapper, int partitionId, int numCorePartitions) { @@ -76,6 +87,7 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec bucketId, numBuckets, partitionDimensions, + partitionFunction, objectMapper ); } @@ -98,12 +110,13 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec HashBasedNumberedPartialShardSpec that = (HashBasedNumberedPartialShardSpec) o; return bucketId == that.bucketId && numBuckets == that.numBuckets && - Objects.equals(partitionDimensions, that.partitionDimensions); + Objects.equals(partitionDimensions, that.partitionDimensions) && + Objects.equals(partitionFunction, that.partitionFunction); } @Override public int hashCode() { - return Objects.hash(partitionDimensions, bucketId, numBuckets); + return Objects.hash(partitionDimensions, bucketId, numBuckets, partitionFunction); } } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java index f564c2fe3b2..86a05547c61 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -21,20 +21,15 @@ package org.apache.druid.timeline.partition; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BoundType; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.collect.RangeSet; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.Rows; +import org.apache.druid.java.util.common.ISE; import javax.annotation.Nullable; import java.util.Collections; @@ -49,17 +44,29 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec { public static final List DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of(); - private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(); - private final int bucketId; + /** * Number of hash buckets */ private final int numBuckets; private final ObjectMapper jsonMapper; - @JsonIgnore private final List partitionDimensions; + /** + * A hash function to use for both hash partitioning at ingestion time and pruning segments at query time. + * + * During ingestion, the partition function is defaulted to {@link HashPartitionFunction#MURMUR3_32_ABS} if this + * variable is null. See {@link HashPartitioner} for details. + * + * During query, this function will be null unless it is explicitly specified in + * {@link org.apache.druid.indexer.partitions.HashedPartitionsSpec} at ingestion time. This is because the default + * hash function used to create segments at ingestion time can change over time, but we don't guarantee the changed + * hash function is backwards-compatible. The query will process all segments if this function is null. + */ + @Nullable + private final HashPartitionFunction partitionFunction; + @JsonCreator public HashBasedNumberedShardSpec( @JsonProperty("partitionNum") int partitionNum, // partitionId @@ -67,6 +74,7 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec @JsonProperty("bucketId") @Nullable Integer bucketId, // nullable for backward compatibility @JsonProperty("numBuckets") @Nullable Integer numBuckets, // nullable for backward compatibility @JsonProperty("partitionDimensions") @Nullable List partitionDimensions, + @JsonProperty("partitionFunction") @Nullable HashPartitionFunction partitionFunction, // nullable for backward compatibility @JacksonInject ObjectMapper jsonMapper ) { @@ -76,8 +84,9 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec // If numBuckets is missing, assume that any hash bucket is not empty. // Use the core partition set size as the number of buckets. this.numBuckets = numBuckets == null ? partitions : numBuckets; - this.jsonMapper = jsonMapper; this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions; + this.partitionFunction = partitionFunction; + this.jsonMapper = jsonMapper; } @JsonProperty @@ -98,140 +107,44 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec return partitionDimensions; } + @JsonProperty + public @Nullable HashPartitionFunction getPartitionFunction() + { + return partitionFunction; + } + @Override public List getDomainDimensions() { return partitionDimensions; } - @Override - public boolean isInChunk(long timestamp, InputRow inputRow) - { - return getBucketIndex(hash(timestamp, inputRow), numBuckets) == bucketId % numBuckets; - } - - /** - * Check if the current segment possibly holds records if the values of dimensions in {@link #partitionDimensions} - * are of {@code partitionDimensionsValues} - * - * @param partitionDimensionsValues An instance of values of dimensions in {@link #partitionDimensions} - * - * @return Whether the current segment possibly holds records for the given values of partition dimensions - */ - private boolean isInChunk(Map partitionDimensionsValues) - { - assert !partitionDimensions.isEmpty(); - List groupKey = Lists.transform( - partitionDimensions, - o -> Collections.singletonList(partitionDimensionsValues.get(o)) - ); - try { - return getBucketIndex(hash(jsonMapper, groupKey), numBuckets) == bucketId % numBuckets; - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - /** - * This method calculates the hash based on whether {@param partitionDimensions} is null or not. - * If yes, then both {@param timestamp} and dimension columns in {@param inputRow} are used {@link Rows#toGroupKey} - * Or else, columns in {@param partitionDimensions} are used - * - * @param timestamp should be bucketed with query granularity - * @param inputRow row from input data - * - * @return hash value - */ - protected int hash(long timestamp, InputRow inputRow) - { - return hash(jsonMapper, partitionDimensions, timestamp, inputRow); - } - - public static int hash(ObjectMapper jsonMapper, List partitionDimensions, long timestamp, InputRow inputRow) - { - final List groupKey = getGroupKey(partitionDimensions, timestamp, inputRow); - try { - return hash(jsonMapper, groupKey); - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - public static List getGroupKey(final List partitionDimensions, final long timestamp, final InputRow inputRow) - { - if (partitionDimensions.isEmpty()) { - return Rows.toGroupKey(timestamp, inputRow); - } else { - return Lists.transform(partitionDimensions, inputRow::getDimension); - } - } - - @VisibleForTesting - public static int hash(ObjectMapper jsonMapper, List objects) throws JsonProcessingException - { - return HASH_FUNCTION.hashBytes(jsonMapper.writeValueAsBytes(objects)).asInt(); - } - @Override public ShardSpecLookup getLookup(final List shardSpecs) { - return createHashLookup(jsonMapper, partitionDimensions, shardSpecs, numBuckets); - } - - static ShardSpecLookup createHashLookup( - ObjectMapper jsonMapper, - List partitionDimensions, - List shardSpecs, - int numBuckets - ) - { - return (long timestamp, InputRow row) -> { - int index = getBucketIndex(hash(jsonMapper, partitionDimensions, timestamp, row), numBuckets); - return shardSpecs.get(index); - }; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; + // partitionFunction can be null when you read a shardSpec of a segment created in an old version of Druid. + // The current version of Druid will always specify a partitionFunction on newly created segments. + if (partitionFunction == null) { + throw new ISE("Cannot create a hashPartitioner since partitionFunction is null"); } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - HashBasedNumberedShardSpec that = (HashBasedNumberedShardSpec) o; - return bucketId == that.bucketId && - numBuckets == that.numBuckets && - Objects.equals(partitionDimensions, that.partitionDimensions); - } - - @Override - public int hashCode() - { - return Objects.hash(super.hashCode(), bucketId, numBuckets, partitionDimensions); - } - - @Override - public String toString() - { - return "HashBasedNumberedShardSpec{" + - "partitionNum=" + getPartitionNum() + - ", partitions=" + getNumCorePartitions() + - ", bucketId=" + bucketId + - ", numBuckets=" + numBuckets + - ", partitionDimensions=" + partitionDimensions + - '}'; + return new HashPartitioner( + jsonMapper, + partitionFunction, + partitionDimensions, + numBuckets + ).createHashLookup(shardSpecs); } @Override public boolean possibleInDomain(Map> domain) { + // partitionFunction should be used instead of HashPartitioner at query time. + // We should process all segments if partitionFunction is null because we don't know what hash function + // was used to create segments at ingestion time. + if (partitionFunction == null) { + return true; + } + // If no partitionDimensions are specified during ingestion, hash is based on all dimensions plus the truncated // input timestamp according to QueryGranularity instead of just partitionDimensions. Since we don't store in shard // specs the truncated timestamps of the events that fall into the shard after ingestion, there's no way to recover @@ -260,35 +173,36 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec } } - return !domainSet.isEmpty() && chunkPossibleInDomain(domainSet, new HashMap<>()); + return !domainSet.isEmpty() && chunkPossibleInDomain(partitionFunction, domainSet, new HashMap<>()); } /** * Recursively enumerate all possible combinations of values for dimensions in {@link #partitionDimensions} based on * {@code domainSet}, test if any combination matches the current segment * + * @param hashPartitionFunction hash function used to create segments at ingestion time * @param domainSet The set where values of dimensions in {@link #partitionDimensions} are * drawn from * @param partitionDimensionsValues A map from dimensions in {@link #partitionDimensions} to their values drawn from * {@code domainSet} - * * @return Whether the current segment possibly holds records for the provided domain. Return false if and only if * none of the combinations matches this segment */ private boolean chunkPossibleInDomain( + HashPartitionFunction hashPartitionFunction, Map> domainSet, Map partitionDimensionsValues ) { int curIndex = partitionDimensionsValues.size(); if (curIndex == partitionDimensions.size()) { - return isInChunk(partitionDimensionsValues); + return isInChunk(hashPartitionFunction, partitionDimensionsValues); } String dimension = partitionDimensions.get(curIndex); for (String e : domainSet.get(dimension)) { partitionDimensionsValues.put(dimension, e); - if (chunkPossibleInDomain(domainSet, partitionDimensionsValues)) { + if (chunkPossibleInDomain(hashPartitionFunction, domainSet, partitionDimensionsValues)) { return true; } partitionDimensionsValues.remove(dimension); @@ -297,8 +211,73 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec return false; } - private static int getBucketIndex(int hash, int numBuckets) + /** + * Check if the current segment possibly holds records if the values of dimensions in {@link #partitionDimensions} + * are of {@code partitionDimensionsValues} + * + * @param hashPartitionFunction hash function used to create segments at ingestion time + * @param partitionDimensionsValues An instance of values of dimensions in {@link #partitionDimensions} + * + * @return Whether the current segment possibly holds records for the given values of partition dimensions + */ + private boolean isInChunk(HashPartitionFunction hashPartitionFunction, Map partitionDimensionsValues) { - return Math.abs(hash % numBuckets); + assert !partitionDimensions.isEmpty(); + List groupKey = Lists.transform( + partitionDimensions, + o -> Collections.singletonList(partitionDimensionsValues.get(o)) + ); + return hashPartitionFunction.hash(serializeGroupKey(jsonMapper, groupKey), numBuckets) == bucketId; + } + + /** + * Serializes a group key into a byte array. The serialization algorithm can affect hash values of partition keys + * since {@link HashPartitionFunction#hash} takes the result of this method as its input. This means, the returned + * byte array should be backwards-compatible in cases where we need to modify this method. + */ + public static byte[] serializeGroupKey(ObjectMapper jsonMapper, List partitionKeys) + { + try { + return jsonMapper.writeValueAsBytes(partitionKeys); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + HashBasedNumberedShardSpec that = (HashBasedNumberedShardSpec) o; + return bucketId == that.bucketId && + numBuckets == that.numBuckets && + Objects.equals(partitionDimensions, that.partitionDimensions) && + partitionFunction == that.partitionFunction; + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), bucketId, numBuckets, partitionDimensions, partitionFunction); + } + + @Override + public String toString() + { + return "HashBasedNumberedShardSpec{" + + "bucketId=" + bucketId + + ", numBuckets=" + numBuckets + + ", partitionDimensions=" + partitionDimensions + + ", partitionFunction=" + partitionFunction + + '}'; } } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/HashBucketShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/HashBucketShardSpec.java index 324c0204498..cef6686e9a1 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/HashBucketShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/HashBucketShardSpec.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.data.input.InputRow; +import com.google.common.base.Preconditions; import java.util.List; import java.util.Objects; @@ -40,6 +40,7 @@ public class HashBucketShardSpec implements BucketNumberedShardSpec partitionDimensions; + private final HashPartitionFunction partitionFunction; private final ObjectMapper jsonMapper; @JsonCreator @@ -47,6 +48,7 @@ public class HashBucketShardSpec implements BucketNumberedShardSpec partitionDimensions, + @JsonProperty("partitionFunction") HashPartitionFunction partitionFunction, @JacksonInject ObjectMapper jsonMapper ) { @@ -55,6 +57,7 @@ public class HashBucketShardSpec implements BucketNumberedShardSpec shardSpecs) { - return HashBasedNumberedShardSpec.createHashLookup(jsonMapper, partitionDimensions, shardSpecs, numBuckets); + return new HashPartitioner( + jsonMapper, + partitionFunction, + partitionDimensions, + numBuckets + ).createHashLookup(shardSpecs); } @Override @@ -108,22 +122,24 @@ public class HashBucketShardSpec implements BucketNumberedShardSpec partitionDimensions; + private final int numBuckets; + + HashPartitioner( + final ObjectMapper jsonMapper, + final HashPartitionFunction hashPartitionFunction, + final List partitionDimensions, + final int numBuckets + ) + { + this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper"); + this.hashPartitionFunction = Preconditions.checkNotNull(hashPartitionFunction, "hashPartitionFunction"); + this.partitionDimensions = Preconditions.checkNotNull(partitionDimensions, "partitionDimensions"); + this.numBuckets = numBuckets; + } + + ShardSpecLookup createHashLookup(final List shardSpecs) + { + Preconditions.checkNotNull(hashPartitionFunction, "hashPartitionFunction"); + return (long timestamp, InputRow row) -> { + int index = hash(timestamp, row); + return shardSpecs.get(index); + }; + } + + @VisibleForTesting + int hash(final long timestamp, final InputRow inputRow) + { + return hashPartitionFunction.hash( + HashBasedNumberedShardSpec.serializeGroupKey(jsonMapper, extractKeys(timestamp, inputRow)), + numBuckets + ); + } + + /** + * This method extracts keys for hash partitioning based on whether {@param partitionDimensions} is empty or not. + * If yes, then both {@param timestamp} and dimension values in {@param inputRow} are returned. + * Otherwise, values of {@param partitionDimensions} are returned. + * + * @param timestamp should be bucketed with query granularity + * @param inputRow row from input data + * + * @return a list of values of grouping keys + */ + @VisibleForTesting + List extractKeys(final long timestamp, final InputRow inputRow) + { + return extractKeys(partitionDimensions, timestamp, inputRow); + } + + public static List extractKeys( + final List partitionDimensions, + final long timestamp, + final InputRow inputRow + ) + { + if (partitionDimensions.isEmpty()) { + return Rows.toGroupKey(timestamp, inputRow); + } else { + return Lists.transform(partitionDimensions, inputRow::getDimension); + } + } +} diff --git a/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java index 0f16a427abc..837c8e6f4b8 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java @@ -79,12 +79,6 @@ public final class LinearShardSpec implements ShardSpec return new LinearPartitionChunk<>(partitionNum, obj); } - @Override - public boolean isInChunk(long timestamp, InputRow inputRow) - { - return true; - } - @Override public boolean equals(Object o) { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java index 1f8498beb4a..b123c24ff26 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java @@ -52,12 +52,6 @@ public class NoneShardSpec implements ShardSpec return new SingleElementPartitionChunk<>(obj); } - @Override - public boolean isInChunk(long timestamp, InputRow inputRow) - { - return true; - } - @Override @JsonIgnore public int getPartitionNum() diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java index 55ba3ce200b..808abdec871 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpec.java @@ -146,12 +146,6 @@ public class NumberedOverwriteShardSpec implements OverwriteShardSpec return new NumberedOverwritingPartitionChunk<>(partitionId, obj); } - @Override - public boolean isInChunk(long timestamp, InputRow inputRow) - { - return true; - } - @JsonProperty("partitionId") @Override public int getPartitionNum() diff --git a/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java index acf091d6dd4..e6ad1e401e8 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java @@ -106,12 +106,6 @@ public class NumberedShardSpec implements ShardSpec return NumberedPartitionChunk.make(partitionNum, partitions, obj); } - @Override - public boolean isInChunk(long timestamp, InputRow inputRow) - { - return true; - } - @Override public String toString() { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java index a329131e748..00b01f16437 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java @@ -22,6 +22,7 @@ package org.apache.druid.timeline.partition; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.ISE; import javax.annotation.Nullable; import java.util.List; @@ -90,8 +91,7 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec shardSpecs) { - return SingleDimensionShardSpec.createLookup(shardSpecs); + return (long timestamp, InputRow row) -> { + for (ShardSpec spec : shardSpecs) { + if (((RangeBucketShardSpec) spec).isInChunk(row)) { + return spec; + } + } + throw new ISE("row[%s] doesn't fit in any shard[%s]", row, shardSpecs); + }; } @Override diff --git a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java index f954f4dc430..5098a3c31dd 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.collect.RangeSet; -import org.apache.druid.data.input.InputRow; import java.util.List; import java.util.Map; @@ -56,9 +55,6 @@ public interface ShardSpec @JsonIgnore PartitionChunk createChunk(T obj); - @JsonIgnore - boolean isInChunk(long timestamp, InputRow inputRow); - /** * Returns the partition ID of this segment. */ diff --git a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java index 8deef57da3e..da5024ac991 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java @@ -21,6 +21,7 @@ package org.apache.druid.timeline.partition; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Range; @@ -115,7 +116,7 @@ public class SingleDimensionShardSpec implements ShardSpec { return (long timestamp, InputRow row) -> { for (ShardSpec spec : shardSpecs) { - if (spec.isInChunk(timestamp, row)) { + if (((SingleDimensionShardSpec) spec).isInChunk(row)) { return spec; } } @@ -164,8 +165,8 @@ public class SingleDimensionShardSpec implements ShardSpec } } - @Override - public boolean isInChunk(long timestamp, InputRow inputRow) + @VisibleForTesting + boolean isInChunk(InputRow inputRow) { return isInChunk(dimension, start, end, inputRow); } diff --git a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java index ec177ac6e48..54836421f0b 100644 --- a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java +++ b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java @@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.RangeSet; import org.apache.druid.TestObjectMapper; -import org.apache.druid.data.input.InputRow; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.java.util.common.DateTimes; @@ -64,12 +63,6 @@ public class DataSegmentTest return null; } - @Override - public boolean isInChunk(long timestamp, InputRow inputRow) - { - return false; - } - @Override public int getPartitionNum() { diff --git a/core/src/test/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpecTest.java index 287e0dcb004..6262169fbca 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpecTest.java @@ -36,8 +36,15 @@ public class BuildingHashBasedNumberedShardSpecTest public void testConvert() { Assert.assertEquals( - new HashBasedNumberedShardSpec(5, 10, 5, 12, ImmutableList.of("dim"), mapper), - new BuildingHashBasedNumberedShardSpec(5, 5, 12, ImmutableList.of("dim"), mapper).convert(10) + new HashBasedNumberedShardSpec(5, 10, 5, 12, ImmutableList.of("dim"), HashPartitionFunction.MURMUR3_32_ABS, mapper), + new BuildingHashBasedNumberedShardSpec( + 5, + 5, + 12, + ImmutableList.of("dim"), + HashPartitionFunction.MURMUR3_32_ABS, + mapper + ).convert(10) ); } @@ -46,8 +53,14 @@ public class BuildingHashBasedNumberedShardSpecTest { Assert.assertEquals( new NumberedPartitionChunk<>(5, 0, "test"), - new BuildingHashBasedNumberedShardSpec(5, 5, 12, ImmutableList.of("dim"), mapper) - .createChunk("test") + new BuildingHashBasedNumberedShardSpec( + 5, + 5, + 12, + ImmutableList.of("dim"), + HashPartitionFunction.MURMUR3_32_ABS, + mapper + ).createChunk("test") ); } @@ -63,6 +76,7 @@ public class BuildingHashBasedNumberedShardSpecTest 5, 12, ImmutableList.of("dim"), + HashPartitionFunction.MURMUR3_32_ABS, mapper ); final String json = mapper.writeValueAsString(original); diff --git a/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java index ba2cb7e5036..ed6e4d62f1b 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpecTest.java @@ -47,7 +47,8 @@ public class HashBasedNumberedPartialShardSpecTest final HashBasedNumberedPartialShardSpec expected = new HashBasedNumberedPartialShardSpec( ImmutableList.of("dim1", "dim2"), 1, - 3 + 3, + HashPartitionFunction.MURMUR3_32_ABS ); final byte[] json = MAPPER.writeValueAsBytes(expected); final HashBasedNumberedPartialShardSpec fromJson = (HashBasedNumberedPartialShardSpec) MAPPER.readValue( @@ -63,17 +64,19 @@ public class HashBasedNumberedPartialShardSpecTest final HashBasedNumberedPartialShardSpec expected = new HashBasedNumberedPartialShardSpec( ImmutableList.of("dim1", "dim2"), 1, - 3 + 3, + HashPartitionFunction.MURMUR3_32_ABS ); final byte[] json = MAPPER.writeValueAsBytes(expected); //noinspection unchecked final Map map = MAPPER.readValue(json, Map.class); - Assert.assertEquals(4, map.size()); + Assert.assertEquals(5, map.size()); Assert.assertEquals(HashBasedNumberedPartialShardSpec.TYPE, map.get("type")); Assert.assertEquals(expected.getPartitionDimensions(), map.get("partitionDimensions")); Assert.assertEquals(expected.getBucketId(), map.get("bucketId")); Assert.assertEquals(expected.getNumBuckets(), map.get("numPartitions")); Assert.assertEquals(expected.getBucketId(), map.get("bucketId")); + Assert.assertEquals(expected.getPartitionFunction().toString(), map.get("partitionFunction")); } @Test @@ -82,11 +85,12 @@ public class HashBasedNumberedPartialShardSpecTest final HashBasedNumberedPartialShardSpec partialShardSpec = new HashBasedNumberedPartialShardSpec( ImmutableList.of("dim"), 2, - 4 + 4, + null ); final ShardSpec shardSpec = partialShardSpec.complete(MAPPER, 1, 3); Assert.assertEquals( - new HashBasedNumberedShardSpec(1, 3, 2, 4, ImmutableList.of("dim"), MAPPER), + new HashBasedNumberedShardSpec(1, 3, 2, 4, ImmutableList.of("dim"), null, MAPPER), shardSpec ); } diff --git a/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java index 885b1fdeca8..74a2bb1a231 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java @@ -32,7 +32,6 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.ISE; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; @@ -69,6 +68,7 @@ public class HashBasedNumberedShardSpecTest 1, 3, ImmutableList.of("visitor_id"), + HashPartitionFunction.MURMUR3_32_ABS, objectMapper ) ), @@ -79,6 +79,10 @@ public class HashBasedNumberedShardSpecTest Assert.assertEquals(1, ((HashBasedNumberedShardSpec) spec).getBucketId()); Assert.assertEquals(3, ((HashBasedNumberedShardSpec) spec).getNumBuckets()); Assert.assertEquals(ImmutableList.of("visitor_id"), ((HashBasedNumberedShardSpec) spec).getPartitionDimensions()); + Assert.assertEquals( + HashPartitionFunction.MURMUR3_32_ABS, + ((HashBasedNumberedShardSpec) spec).getPartitionFunction() + ); } @Test @@ -102,15 +106,16 @@ public class HashBasedNumberedShardSpecTest ImmutableList.of("visitor_id"), ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getPartitionDimensions() ); + Assert.assertNull(((HashBasedNumberedShardSpec) specWithPartitionDimensions).getPartitionFunction()); } @Test public void testPartitionChunks() { final List specs = ImmutableList.of( - new HashBasedNumberedShardSpec(0, 3, 0, 3, null, objectMapper), - new HashBasedNumberedShardSpec(1, 3, 1, 3, null, objectMapper), - new HashBasedNumberedShardSpec(2, 3, 2, 3, null, objectMapper) + new HashBasedNumberedShardSpec(0, 3, 0, 3, null, null, objectMapper), + new HashBasedNumberedShardSpec(1, 3, 1, 3, null, null, objectMapper), + new HashBasedNumberedShardSpec(2, 3, 2, 3, null, null, objectMapper) ); final List> chunks = Lists.transform( @@ -149,37 +154,56 @@ public class HashBasedNumberedShardSpecTest Assert.assertFalse(chunks.get(2).abuts(chunks.get(2))); } + private HashPartitioner createHashPartitionerForHashInputRow(int numBuckets) + { + return new HashPartitioner( + objectMapper, + HashPartitionFunction.MURMUR3_32_ABS, + ImmutableList.of(), + numBuckets + ) + { + @Override + int hash(final long timestamp, final InputRow inputRow) + { + return Math.abs(inputRow.hashCode() % numBuckets); + } + }; + } + @Test public void testIsInChunk() { - List specs = new ArrayList<>(); + List specs = new ArrayList<>(); for (int i = 0; i < 3; i++) { - specs.add(new HashOverridenShardSpec(i, 3)); + specs.add(newShardSpecForTesting(i, 3)); } + final HashPartitioner hashPartitioner = createHashPartitionerForHashInputRow(3); - assertExistsInOneSpec(specs, new HashInputRow(Integer.MIN_VALUE)); - assertExistsInOneSpec(specs, new HashInputRow(Integer.MAX_VALUE)); - assertExistsInOneSpec(specs, new HashInputRow(0)); - assertExistsInOneSpec(specs, new HashInputRow(1000)); - assertExistsInOneSpec(specs, new HashInputRow(-1000)); + Assert.assertTrue(existsInOneSpec(specs, hashPartitioner, new HashInputRow(Integer.MIN_VALUE))); + Assert.assertTrue(existsInOneSpec(specs, hashPartitioner, new HashInputRow(Integer.MAX_VALUE))); + Assert.assertTrue(existsInOneSpec(specs, hashPartitioner, new HashInputRow(0))); + Assert.assertTrue(existsInOneSpec(specs, hashPartitioner, new HashInputRow(1000))); + Assert.assertTrue(existsInOneSpec(specs, hashPartitioner, new HashInputRow(-1000))); } @Test public void testIsInChunkWithMorePartitionsBeyondNumBucketsReturningTrue() { final int numBuckets = 3; - final List specs = IntStream.range(0, 10) - .mapToObj(i -> new HashOverridenShardSpec(i, numBuckets)) - .collect(Collectors.toList()); + final List specs = IntStream.range(0, 10) + .mapToObj(i -> newShardSpecForTesting(i, numBuckets)) + .collect(Collectors.toList()); + final HashPartitioner hashPartitioner = createHashPartitionerForHashInputRow(numBuckets); for (int i = 0; i < 10; i++) { final InputRow row = new HashInputRow(numBuckets * 10000 + i); - Assert.assertTrue(specs.get(i).isInChunk(row.getTimestampFromEpoch(), row)); + Assert.assertTrue(isInChunk(specs.get(i), hashPartitioner, row.getTimestampFromEpoch(), row)); } } @Test - public void testGetGroupKey() + public void testExtractKeys() { final List partitionDimensions1 = ImmutableList.of("visitor_id"); final DateTime time = DateTimes.nowUtc(); @@ -190,16 +214,26 @@ public class HashBasedNumberedShardSpecTest ); Assert.assertEquals( ImmutableList.of(Collections.singletonList("v1")), - HashBasedNumberedShardSpec.getGroupKey(partitionDimensions1, time.getMillis(), inputRow) + new HashPartitioner( + objectMapper, + HashPartitionFunction.MURMUR3_32_ABS, + partitionDimensions1, + 0 // not used + ).extractKeys(time.getMillis(), inputRow) ); Assert.assertEquals( ImmutableList.of( - time.getMillis(), - ImmutableMap.of("cnt", Collections.singletonList(10), "visitor_id", Collections.singletonList("v1"))) - .toString(), + time.getMillis(), + ImmutableMap.of("cnt", Collections.singletonList(10), "visitor_id", Collections.singletonList("v1")) + ).toString(), // empty list when partitionDimensions is null - HashBasedNumberedShardSpec.getGroupKey(ImmutableList.of(), time.getMillis(), inputRow).toString() + new HashPartitioner( + objectMapper, + HashPartitionFunction.MURMUR3_32_ABS, + ImmutableList.of(), + 0 // not used + ).extractKeys(time.getMillis(), inputRow).toString() ); } @@ -212,89 +246,150 @@ public class HashBasedNumberedShardSpecTest 1, 3, ImmutableList.of("visitor_id"), + null, objectMapper ); Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance())); - Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1))); + Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1, null))); Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1))); Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1))); } @Test - public void testPossibleInDomain() + public void testPossibleInDomainWithNullHashPartitionFunctionReturnAll() { final RangeSet rangeSet = TreeRangeSet.create(); rangeSet.add(Range.closed("123", "123")); final Map> domain = ImmutableMap.of("visitor_id", rangeSet); - // Without partition info - HashBasedNumberedShardSpec shardSpec = new HashBasedNumberedShardSpec( - 0, - 1, - 0, - 1, - ImmutableList.of(), - objectMapper - ); - Assert.assertTrue(shardSpec.possibleInDomain(domain)); - - // With partition info and matching partition dimensions final int numBuckets = 3; - List shardSpecs = ImmutableList.of( - new HashBasedNumberedShardSpec( - 0, - numBuckets, - 0, - numBuckets, - ImmutableList.of("visitor_id"), - objectMapper - ), - new HashBasedNumberedShardSpec( - 1, - numBuckets, - 1, - numBuckets, - ImmutableList.of("visitor_id"), - objectMapper - ), - new HashBasedNumberedShardSpec( - 2, - numBuckets, - 2, - numBuckets, - ImmutableList.of("visitor_id"), - objectMapper - ) - ); - Assert.assertEquals(1, shardSpecs.stream().filter(s -> s.possibleInDomain(domain)).count()); + final List shardSpecs = new ArrayList<>(); + for (int i = 0; i < numBuckets; i++) { + shardSpecs.add( + new HashBasedNumberedShardSpec( + i, + numBuckets, + i, + numBuckets, + ImmutableList.of("visitor_id"), + null, + objectMapper + ) + ); + } + Assert.assertEquals(numBuckets, shardSpecs.stream().filter(s -> s.possibleInDomain(domain)).count()); + } - // Partition dimensions not match + @Test + public void testPossibleInDomainWithoutPartitionDimensionsReturnAll() + { + final RangeSet rangeSet = TreeRangeSet.create(); + rangeSet.add(Range.closed("123", "123")); + final Map> domain = ImmutableMap.of("visitor_id", rangeSet); + + final int numBuckets = 3; + final List shardSpecs = new ArrayList<>(); + for (int i = 0; i < numBuckets; i++) { + shardSpecs.add( + new HashBasedNumberedShardSpec( + i, + numBuckets, + i, + numBuckets, + ImmutableList.of(), + HashPartitionFunction.MURMUR3_32_ABS, + objectMapper + ) + ); + } + Assert.assertEquals(numBuckets, shardSpecs.stream().filter(s -> s.possibleInDomain(domain)).count()); + } + + @Test + public void testPossibleInDomainFilterOnPartitionDimensionsReturnPrunedShards() + { + final RangeSet rangeSet = TreeRangeSet.create(); + rangeSet.add(Range.closed("123", "123")); + final Map> domain = ImmutableMap.of("visitor_id", rangeSet); + + final int numBuckets = 3; + final List shardSpecs = new ArrayList<>(); + for (int i = 0; i < numBuckets; i++) { + shardSpecs.add( + new HashBasedNumberedShardSpec( + i, + numBuckets, + i, + numBuckets, + ImmutableList.of("visitor_id"), + HashPartitionFunction.MURMUR3_32_ABS, + objectMapper + ) + ); + } + Assert.assertEquals(1, shardSpecs.stream().filter(s -> s.possibleInDomain(domain)).count()); + } + + @Test + public void testPossibleInDomainFilterOnNonPartitionDimensionsReturnAll() + { + final RangeSet rangeSet = TreeRangeSet.create(); + rangeSet.add(Range.closed("123", "123")); final Map> domain1 = ImmutableMap.of("vistor_id_1", rangeSet); + final int numBuckets = 3; + final List shardSpecs = new ArrayList<>(); + for (int i = 0; i < numBuckets; i++) { + shardSpecs.add( + new HashBasedNumberedShardSpec( + i, + numBuckets, + i, + numBuckets, + ImmutableList.of("visitor_id"), + HashPartitionFunction.MURMUR3_32_ABS, + objectMapper + ) + ); + } Assert.assertEquals(shardSpecs.size(), shardSpecs.stream().filter(s -> s.possibleInDomain(domain1)).count()); } - public boolean assertExistsInOneSpec(List specs, InputRow row) + public boolean existsInOneSpec( + List specs, + HashPartitioner hashPartitioner, + InputRow row + ) { - for (ShardSpec spec : specs) { - if (spec.isInChunk(row.getTimestampFromEpoch(), row)) { + for (HashBasedNumberedShardSpec spec : specs) { + if (isInChunk(spec, hashPartitioner, row.getTimestampFromEpoch(), row)) { return true; } } - throw new ISE("None of the partition matches"); + return false; } - public class HashOverridenShardSpec extends HashBasedNumberedShardSpec + private boolean isInChunk( + HashBasedNumberedShardSpec shardSpec, + HashPartitioner hashPartitioner, + long timestamp, + InputRow inputRow + ) { - public HashOverridenShardSpec(int partitionNum, int partitions) - { - super(partitionNum, partitions, partitionNum, partitions, null, objectMapper); - } + final int bucketId = hashPartitioner.hash(timestamp, inputRow); + return bucketId == shardSpec.getBucketId(); + } - @Override - protected int hash(long timestamp, InputRow inputRow) - { - return inputRow.hashCode(); - } + private HashBasedNumberedShardSpec newShardSpecForTesting(int partitionNum, int partitions) + { + return new HashBasedNumberedShardSpec( + partitionNum, + partitions, + partitionNum % partitions, + partitions, + null, + null, + objectMapper + ); } public static class HashInputRow implements InputRow diff --git a/core/src/test/java/org/apache/druid/timeline/partition/HashBucketShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/HashBucketShardSpecTest.java index df2207b798f..146157b8cea 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/HashBucketShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/HashBucketShardSpecTest.java @@ -36,13 +36,20 @@ import java.util.List; public class HashBucketShardSpecTest { private final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper(); - + @Test public void testConvert() { Assert.assertEquals( - new BuildingHashBasedNumberedShardSpec(3, 5, 12, ImmutableList.of("dim"), mapper), - new HashBucketShardSpec(5, 12, ImmutableList.of("dim"), mapper).convert(3) + new BuildingHashBasedNumberedShardSpec( + 3, + 5, + 12, + ImmutableList.of("dim"), + HashPartitionFunction.MURMUR3_32_ABS, + mapper + ), + new HashBucketShardSpec(5, 12, ImmutableList.of("dim"), HashPartitionFunction.MURMUR3_32_ABS, mapper).convert(3) ); } @@ -51,7 +58,13 @@ public class HashBucketShardSpecTest { Assert.assertEquals( new NumberedPartitionChunk<>(5, 0, "test"), - new HashBucketShardSpec(5, 12, ImmutableList.of("dim"), mapper).createChunk("test") + new HashBucketShardSpec( + 5, + 12, + ImmutableList.of("dim"), + HashPartitionFunction.MURMUR3_32_ABS, + mapper + ).createChunk("test") ); } @@ -59,9 +72,9 @@ public class HashBucketShardSpecTest public void testShardSpecLookup() { final List shardSpecs = ImmutableList.of( - new HashBucketShardSpec(0, 3, ImmutableList.of("dim"), mapper), - new HashBucketShardSpec(1, 3, ImmutableList.of("dim"), mapper), - new HashBucketShardSpec(2, 3, ImmutableList.of("dim"), mapper) + new HashBucketShardSpec(0, 3, ImmutableList.of("dim"), HashPartitionFunction.MURMUR3_32_ABS, mapper), + new HashBucketShardSpec(1, 3, ImmutableList.of("dim"), HashPartitionFunction.MURMUR3_32_ABS, mapper), + new HashBucketShardSpec(2, 3, ImmutableList.of("dim"), HashPartitionFunction.MURMUR3_32_ABS, mapper) ); final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs); final long currentTime = DateTimes.nowUtc().getMillis(); @@ -103,7 +116,13 @@ public class HashBucketShardSpecTest mapper.registerSubtypes(new NamedType(HashBucketShardSpec.class, HashBucketShardSpec.TYPE)); mapper.setInjectableValues(new Std().addValue(ObjectMapper.class, mapper)); - final HashBucketShardSpec original = new HashBucketShardSpec(5, 12, ImmutableList.of("dim"), mapper); + final HashBucketShardSpec original = new HashBucketShardSpec( + 5, + 12, + ImmutableList.of("dim"), + HashPartitionFunction.MURMUR3_32_ABS, + mapper + ); final String json = mapper.writeValueAsString(original); final HashBucketShardSpec fromJson = (HashBucketShardSpec) mapper.readValue(json, ShardSpec.class); Assert.assertEquals(original, fromJson); diff --git a/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java index 5efe592fabf..4079143fe29 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/NumberedOverwriteShardSpecTest.java @@ -62,7 +62,7 @@ public class NumberedOverwriteShardSpecTest (short) 1 ); Assert.assertFalse(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance())); - Assert.assertFalse(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1))); + Assert.assertFalse(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1, null))); Assert.assertFalse(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1))); Assert.assertTrue(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1))); } diff --git a/core/src/test/java/org/apache/druid/timeline/partition/NumberedShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/NumberedShardSpecTest.java index 8b8d5da1b2c..1c77a7f8040 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/NumberedShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/NumberedShardSpecTest.java @@ -198,7 +198,7 @@ public class NumberedShardSpecTest { final NumberedShardSpec shardSpec = new NumberedShardSpec(0, 1); Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance())); - Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1))); + Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1, null))); Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1))); Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1))); } diff --git a/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java b/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java index bf34aaa6aaa..58e1fbd1b13 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java @@ -47,9 +47,9 @@ public class PartitionHolderCompletenessTest new Object[]{ // Simulate empty hash buckets ImmutableList.of( - new HashBasedNumberedShardSpec(2, 3, 3, 5, null, new ObjectMapper()), - new HashBasedNumberedShardSpec(0, 3, 0, 5, null, new ObjectMapper()), - new HashBasedNumberedShardSpec(1, 3, 2, 5, null, new ObjectMapper()) + new HashBasedNumberedShardSpec(2, 3, 3, 5, null, null, new ObjectMapper()), + new HashBasedNumberedShardSpec(0, 3, 0, 5, null, null, new ObjectMapper()), + new HashBasedNumberedShardSpec(1, 3, 2, 5, null, null, new ObjectMapper()) ), HashBasedNumberedShardSpec.class.getSimpleName() }, diff --git a/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java index a8f1aa9d33e..a579245528c 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java @@ -105,7 +105,7 @@ public class SingleDimensionShardSpecTest ImmutableList.of("billy"), Maps.transformValues(pair.rhs, input -> input) ); - Assert.assertEquals(StringUtils.format("spec[%s], row[%s]", spec, inputRow), pair.lhs, spec.isInChunk(inputRow.getTimestampFromEpoch(), inputRow)); + Assert.assertEquals(StringUtils.format("spec[%s], row[%s]", spec, inputRow), pair.lhs, spec.isInChunk(inputRow)); } } } @@ -145,7 +145,7 @@ public class SingleDimensionShardSpecTest { final SingleDimensionShardSpec shardSpec = makeSpec("start", "end"); Assert.assertTrue(shardSpec.sharePartitionSpace(NumberedPartialShardSpec.instance())); - Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1))); + Assert.assertTrue(shardSpec.sharePartitionSpace(new HashBasedNumberedPartialShardSpec(null, 0, 1, null))); Assert.assertTrue(shardSpec.sharePartitionSpace(new SingleDimensionPartialShardSpec("dim", 0, null, null, 1))); Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1))); } diff --git a/docs/ingestion/hadoop.md b/docs/ingestion/hadoop.md index 0f5db6a33b7..088bdceee76 100644 --- a/docs/ingestion/hadoop.md +++ b/docs/ingestion/hadoop.md @@ -387,7 +387,18 @@ The configuration options are: |maxRowsPerSegment|Deprecated. Renamed to `targetRowsPerSegment`. Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or `numShards`| |numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or `maxRowsPerSegment`| |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `numShards`, will be ignored when `targetRowsPerSegment` is set.|no| +|partitionFunction|A function to compute hash of partition dimensions. See [Hash partition function](#hash-partition-function)|`murmur3_32_abs`|no| +##### Hash partition function + +In hash partitioning, the partition function is used to compute hash of partition dimensions. The partition dimension +values are first serialized into a byte array as a whole, and then the partition function is applied to compute hash of +the byte array. +Druid currently supports only one partition function. + +|name|description| +|----|-----------| +|`murmur3_32_abs`|Applies an absolute value function to the result of [`murmur3_32`](https://guava.dev/releases/16.0/api/docs/com/google/common/hash/Hashing.html#murmur3_32()).| ### Single-dimension range partitioning diff --git a/docs/ingestion/index.md b/docs/ingestion/index.md index a88e2f27eb4..0925675f920 100644 --- a/docs/ingestion/index.md +++ b/docs/ingestion/index.md @@ -90,7 +90,7 @@ This table compares the three available options: | **Input locations** | Any [`inputSource`](./native-batch.md#input-sources). | Any Hadoop FileSystem or Druid datasource. | Any [`inputSource`](./native-batch.md#input-sources). | | **File formats** | Any [`inputFormat`](./data-formats.md#input-format). | Any Hadoop InputFormat. | Any [`inputFormat`](./data-formats.md#input-format). | | **[Rollup modes](#rollup)** | Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). | Always perfect. | Perfect if `forceGuaranteedRollup` = true in the [`tuningConfig`](native-batch.md#tuningconfig). | -| **Partitioning options** | Dynamic, hash-based, and range-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec) for details. | Hash-based or range-based partitioning via [`partitionsSpec`](hadoop.md#partitionsspec). | Dynamic and hash-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec) for details. | +| **Partitioning options** | Dynamic, hash-based, and range-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec) for details. | Hash-based or range-based partitioning via [`partitionsSpec`](hadoop.md#partitionsspec). | Dynamic and hash-based partitioning methods are available. See [Partitions Spec](./native-batch.md#partitionsspec-1) for details. | diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 5a58a7f204e..e1d29304037 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -257,10 +257,10 @@ For perfect rollup, you should use either `hashed` (partitioning based on the ha The three `partitionsSpec` types have different characteristics. -| PartitionsSpec | Ingestion speed | Partitioning method | Supported rollup mode | Segment pruning at query time | +| PartitionsSpec | Ingestion speed | Partitioning method | Supported rollup mode | Secondary partition pruning at query time | |----------------|-----------------|---------------------|-----------------------|-------------------------------| | `dynamic` | Fastest | Partitioning based on number of rows in segment. | Best-effort rollup | N/A | -| `hashed` | Moderate | Partitioning based on the hash value of partition dimensions. This partitioning may reduce your datasource size and query latency by improving data locality. See [Partitioning](./index.md#partitioning) for more details. | Perfect rollup | The broker can use the partition information to prune segments early to speed up queries if `partitionDimensions` is explicitly specified during ingestion. Since the broker knows how to hash `partitionDimensions` values to locate a segment, given a query including a filter on all the `partitionDimensions`, the broker can pick up only the segments holding the rows satisfying the filter on `partitionDimensions` for query processing. | +| `hashed` | Moderate | Partitioning based on the hash value of partition dimensions. This partitioning may reduce your datasource size and query latency by improving data locality. See [Partitioning](./index.md#partitioning) for more details. | Perfect rollup | The broker can use the partition information to prune segments early to speed up queries. Since the broker knows how to hash `partitionDimensions` values to locate a segment, given a query including a filter on all the `partitionDimensions`, the broker can pick up only the segments holding the rows satisfying the filter on `partitionDimensions` for query processing.

Note that `partitionDimensions` must be set at ingestion time to enable secondary partition pruning at query time.| | `single_dim` | Slowest | Range partitioning based on the value of the partition dimension. Segment sizes may be skewed depending on the partition key distribution. This may reduce your datasource size and query latency by improving data locality. See [Partitioning](./index.md#partitioning) for more details. | Perfect rollup | The broker can use the partition information to prune segments early to speed up queries. Since the broker knows the range of `partitionDimension` values in each segment, given a query including a filter on the `partitionDimension`, the broker can pick up only the segments holding the rows satisfying the filter on `partitionDimension` for query processing. | The recommended use case for each partitionsSpec is: @@ -294,9 +294,10 @@ How the worker task creates segments is: |property|description|default|required?| |--------|-----------|-------|---------| |type|This should always be `hashed`|none|yes| -|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. This property and `targetRowsPerSegment` cannot both be set.|null|no| -|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no| +|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. This property and `targetRowsPerSegment` cannot both be set.|none|no| |targetRowsPerSegment|A target row count for each partition. If `numShards` is left unspecified, the Parallel task will determine a partition count automatically such that each partition has a row count close to the target, assuming evenly distributed keys in the input data. A target per-segment row count of 5 million is used if both `numShards` and `targetRowsPerSegment` are null. |null (or 5,000,000 if both `numShards` and `targetRowsPerSegment` are null)|no| +|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no| +|partitionFunction|A function to compute hash of partition dimensions. See [Hash partition function](#hash-partition-function)|`murmur3_32_abs`|no| The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce). The task runs in up to 3 phases: `partial dimension cardinality`, `partial segment generation` and `partial segment merge`. @@ -320,10 +321,21 @@ the time chunk and the hash value of `partitionDimensions` to be merged; each wo falling in the same time chunk and the same hash value from multiple MiddleManager/Indexer processes and merges them to create the final segments. Finally, they push the final segments to the deep storage at once. +##### Hash partition function + +In hash partitioning, the partition function is used to compute hash of partition dimensions. The partition dimension +values are first serialized into a byte array as a whole, and then the partition function is applied to compute hash of +the byte array. +Druid currently supports only one partition function. + +|name|description| +|----|-----------| +|`murmur3_32_abs`|Applies an absolute value function to the result of [`murmur3_32`](https://guava.dev/releases/16.0/api/docs/com/google/common/hash/Hashing.html#murmur3_32()).| + #### Single-dimension range partitioning > Single dimension range partitioning is currently not supported in the sequential mode of the Parallel task. -Try set `maxNumConcurrentSubTasks` to larger than 1 to use this partitioning. +The Parallel task will use one subtask when you set `maxNumConcurrentSubTasks` to 1. |property|description|default|required?| |--------|-----------|-------|---------| @@ -744,6 +756,7 @@ For perfect rollup, you should use `hashed`. |maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no| |numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no| |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no| +|partitionFunction|A function to compute hash of partition dimensions. See [Hash partition function](#hash-partition-function)|`murmur3_32_abs`|no| For best-effort rollup, you should use `dynamic`. diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index f345b62c1c4..355a8204672 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -58,6 +58,7 @@ These parameters apply to all query types. |parallelMergeInitialYieldRows|`druid.processing.merge.task.initialYieldNumRows`|Number of rows to yield per ForkJoinPool merge task for parallel result merging on the Broker, before forking off a new task to continue merging sequences. See [Broker configuration](../configuration/index.html#broker) for more details.| |parallelMergeSmallBatchRows|`druid.processing.merge.task.smallBatchNumRows`|Size of result batches to operate on in ForkJoinPool merge tasks for parallel result merging on the Broker. See [Broker configuration](../configuration/index.html#broker) for more details.| |useFilterCNF|`false`| If true, Druid will attempt to convert the query filter to Conjunctive Normal Form (CNF). During query processing, columns can be pre-filtered by intersecting the bitmap indexes of all values that match the eligible filters, often greatly reducing the raw number of rows which need to be scanned. But this effect only happens for the top level filter, or individual clauses of a top level 'and' filter. As such, filters in CNF potentially have a higher chance to utilize a large amount of bitmap indexes on string columns during pre-filtering. However, this setting should be used with great caution, as it can sometimes have a negative effect on performance, and in some cases, the act of computing CNF of a filter can be expensive. We recommend hand tuning your filters to produce an optimal form if possible, or at least verifying through experimentation that using this parameter actually improves your query performance with no ill-effects.| +|secondaryPartitionPruning|`true`|Enable secondary partition pruning on the Broker. The Broker will always prune unnecessary segments from the input scan based on a filter on time intervals, but if the data is further partitioned with hash or range partitioning, this option will enable additional pruning based on a filter on secondary partition dimensions.| ## Query-type-specific parameters diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java index 766f5109269..9694c2f4629 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java @@ -140,7 +140,7 @@ public class MaterializedViewSupervisorTest ImmutableMap.of(), ImmutableList.of("dim1", "dim2"), ImmutableList.of("m1"), - new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null), + new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null), 9, 1024 ), @@ -151,7 +151,7 @@ public class MaterializedViewSupervisorTest ImmutableMap.of(), ImmutableList.of("dim1", "dim2"), ImmutableList.of("m1"), - new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null), + new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null), 9, 1024 ), @@ -162,7 +162,7 @@ public class MaterializedViewSupervisorTest ImmutableMap.of(), ImmutableList.of("dim1", "dim2"), ImmutableList.of("m1"), - new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null), + new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null), 9, 1024 ) @@ -175,7 +175,7 @@ public class MaterializedViewSupervisorTest ImmutableMap.of(), ImmutableList.of("dim1", "dim2"), ImmutableList.of("m1"), - new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null), + new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null), 9, 1024 ), @@ -186,7 +186,7 @@ public class MaterializedViewSupervisorTest ImmutableMap.of(), ImmutableList.of("dim1", "dim2"), ImmutableList.of("m1"), - new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null), + new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null), 9, 1024 ) @@ -209,7 +209,7 @@ public class MaterializedViewSupervisorTest ImmutableMap.of(), ImmutableList.of("dim1", "dim2"), ImmutableList.of("m1"), - new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null), + new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null), 9, 1024 ) @@ -225,7 +225,7 @@ public class MaterializedViewSupervisorTest ImmutableMap.of(), ImmutableList.of("dim1", "dim2"), ImmutableList.of("m1"), - new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null), + new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null), 9, 1024 ) @@ -246,7 +246,7 @@ public class MaterializedViewSupervisorTest ImmutableMap.of(), ImmutableList.of("dim1", "dim2"), ImmutableList.of("m1"), - new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null), + new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null), 9, 1024 ) diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 07006a8a88a..0df931cc414 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -197,6 +197,11 @@ hamcrest-core test + + org.mockito + mockito-core + test + diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java index b3906191902..b1ec5460993 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java @@ -29,6 +29,8 @@ import com.google.common.io.Closeables; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.Rows; import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -36,6 +38,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -169,6 +172,15 @@ public class DetermineHashedPartitionsJob implements Jobby log.info("Determined Intervals for Job [%s].", config.getSegmentGranularIntervals()); } Map> shardSpecs = new TreeMap<>(DateTimeComparator.getInstance()); + PartitionsSpec partitionsSpec = config.getPartitionsSpec(); + if (!(partitionsSpec instanceof HashedPartitionsSpec)) { + throw new ISE( + "%s is expected, but got %s", + HashedPartitionsSpec.class.getName(), + partitionsSpec.getClass().getName() + ); + } + HashPartitionFunction partitionFunction = ((HashedPartitionsSpec) partitionsSpec).getPartitionFunction(); int shardCount = 0; for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { DateTime bucket = segmentGranularity.getStart(); @@ -199,6 +211,7 @@ public class DetermineHashedPartitionsJob implements Jobby i, numberOfShards, null, + partitionFunction, HadoopDruidIndexerConfig.JSON_MAPPER ), shardCount++ diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java index defb45f1aaa..a4f719a4c21 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -27,6 +27,7 @@ import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -61,13 +62,16 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby } else { final PartitionsSpec partitionsSpec = config.getPartitionsSpec(); final int shardsPerInterval; + final HashPartitionFunction partitionFunction; if (partitionsSpec instanceof HashedPartitionsSpec) { final HashedPartitionsSpec hashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec; shardsPerInterval = PartitionsSpec.isEffectivelyNull(hashedPartitionsSpec.getNumShards()) ? 1 : hashedPartitionsSpec.getNumShards(); + partitionFunction = hashedPartitionsSpec.getPartitionFunction(); } else { shardsPerInterval = 1; + partitionFunction = null; } Map> shardSpecs = new TreeMap<>(); int shardCount = 0; @@ -84,6 +88,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby i, shardsPerInterval, config.getPartitionsSpec().getPartitionDimensions(), + partitionFunction, HadoopDruidIndexerConfig.JSON_MAPPER ), shardCount++ diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java index 4ac4f6827ba..4b8d72e7884 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java @@ -55,6 +55,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; @@ -489,7 +490,15 @@ public class BatchDeltaIngestionTest INTERVAL_FULL.getStartMillis(), ImmutableList.of( new HadoopyShardSpec( - new HashBasedNumberedShardSpec(0, 1, 0, 1, null, HadoopDruidIndexerConfig.JSON_MAPPER), + new HashBasedNumberedShardSpec( + 0, + 1, + 0, + 1, + null, + HashPartitionFunction.MURMUR3_32_ABS, + HadoopDruidIndexerConfig.JSON_MAPPER + ), 0 ) ) diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java index f15bc2923a8..93eda943424 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -36,6 +36,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Assert; @@ -43,6 +45,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.annotation.Nullable; import java.io.File; import java.util.Arrays; import java.util.Collection; @@ -78,7 +81,8 @@ public class DetermineHashedPartitionsJobTest 0, 1, first, - Granularities.DAY + Granularities.DAY, + null }, { DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), @@ -87,7 +91,8 @@ public class DetermineHashedPartitionsJobTest 0, 6, second, - Granularities.DAY + Granularities.DAY, + null }, { DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), @@ -96,7 +101,8 @@ public class DetermineHashedPartitionsJobTest 0, 6, third, - Granularities.DAY + Granularities.DAY, + null }, { DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), @@ -105,7 +111,18 @@ public class DetermineHashedPartitionsJobTest 0, 6, third, - Granularities.DAY + Granularities.DAY, + null + }, + { + DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), + 1, + null, + 0, + 6, + third, + Granularities.DAY, + HashPartitionFunction.MURMUR3_32_ABS }, { DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.rows.in.timezone.tsv").getPath(), @@ -114,7 +131,8 @@ public class DetermineHashedPartitionsJobTest 0, 1, first, - new PeriodGranularity(new Period("P1D"), null, DateTimes.inferTzFromString("America/Los_Angeles")) + new PeriodGranularity(new Period("P1D"), null, DateTimes.inferTzFromString("America/Los_Angeles")), + null } } ); @@ -127,7 +145,8 @@ public class DetermineHashedPartitionsJobTest int errorMargin, int expectedNumTimeBuckets, int[] expectedNumOfShards, - Granularity segmentGranularity + Granularity segmentGranularity, + @Nullable HashPartitionFunction partitionFunction ) { this.expectedNumOfShards = expectedNumOfShards; @@ -194,7 +213,7 @@ public class DetermineHashedPartitionsJobTest new HadoopTuningConfig( tmpDir.getAbsolutePath(), null, - new HashedPartitionsSpec(targetPartitionSize, null, null), + new HashedPartitionsSpec(targetPartitionSize, null, null, partitionFunction), null, null, null, @@ -226,6 +245,8 @@ public class DetermineHashedPartitionsJobTest { DetermineHashedPartitionsJob determineHashedPartitionsJob = new DetermineHashedPartitionsJob(indexerConfig); determineHashedPartitionsJob.run(); + HashPartitionFunction expectedFunction = ((HashedPartitionsSpec) indexerConfig.getPartitionsSpec()) + .getPartitionFunction(); Map> shardSpecs = indexerConfig.getSchema().getTuningConfig().getShardSpecs(); Assert.assertEquals( expectedNumTimeBuckets, @@ -238,6 +259,10 @@ public class DetermineHashedPartitionsJobTest entry.getValue().size(), errorMargin ); + for (HadoopyShardSpec eachShardSpec : entry.getValue()) { + final HashBasedNumberedShardSpec hashShardSpec = (HashBasedNumberedShardSpec) eachShardSpec.getActualSpec(); + Assert.assertEquals(expectedFunction, hashShardSpec.getPartitionFunction()); + } } } } diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java new file mode 100644 index 00000000000..3c809d26018 --- /dev/null +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class HadoopDruidDetermineConfigurationJobTest +{ + @Test + public void testRunWithHashedPartitionsSpecCreateHashBasedNumberedShardSpecWithHashPartitionFunction() + { + final Set intervals = ImmutableSet.of( + Intervals.of("2020-01-01/P1D"), + Intervals.of("2020-01-02/P1D"), + Intervals.of("2020-01-03/P1D") + ); + final HashedPartitionsSpec partitionsSpec = new HashedPartitionsSpec( + null, + 2, + null, + HashPartitionFunction.MURMUR3_32_ABS, + null, + null + ); + final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class); + Mockito.when(config.isDeterminingPartitions()).thenReturn(false); + Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec); + Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals)); + final ArgumentCaptor>> resultCaptor = ArgumentCaptor.forClass(Map.class); + Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture()); + + final HadoopDruidDetermineConfigurationJob job = new HadoopDruidDetermineConfigurationJob(config); + Assert.assertTrue(job.run()); + final Map> shardSpecs = resultCaptor.getValue(); + Assert.assertEquals(3, shardSpecs.size()); + for (Interval interval : intervals) { + final List shardSpecsPerInterval = shardSpecs.get(interval.getStartMillis()); + Assert.assertEquals(2, shardSpecsPerInterval.size()); + for (int i = 0; i < shardSpecsPerInterval.size(); i++) { + Assert.assertEquals( + new HashBasedNumberedShardSpec( + i, + shardSpecsPerInterval.size(), + i, + shardSpecsPerInterval.size(), + null, + HashPartitionFunction.MURMUR3_32_ABS, + new ObjectMapper() + ), + shardSpecsPerInterval.get(i).getActualSpec() + ); + } + } + } + + @Test + public void testRunWithSingleDimensionPartitionsSpecCreateHashBasedNumberedShardSpecWithoutHashPartitionFunction() + { + final Set intervals = ImmutableSet.of( + Intervals.of("2020-01-01/P1D"), + Intervals.of("2020-01-02/P1D"), + Intervals.of("2020-01-03/P1D") + ); + final SingleDimensionPartitionsSpec partitionsSpec = new SingleDimensionPartitionsSpec(1000, null, "dim", false); + final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class); + Mockito.when(config.isDeterminingPartitions()).thenReturn(false); + Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec); + Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals)); + final ArgumentCaptor>> resultCaptor = ArgumentCaptor.forClass(Map.class); + Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture()); + + final HadoopDruidDetermineConfigurationJob job = new HadoopDruidDetermineConfigurationJob(config); + Assert.assertTrue(job.run()); + final Map> shardSpecs = resultCaptor.getValue(); + Assert.assertEquals(3, shardSpecs.size()); + for (Interval interval : intervals) { + final List shardSpecsPerInterval = shardSpecs.get(interval.getStartMillis()); + Assert.assertEquals(1, shardSpecsPerInterval.size()); + Assert.assertEquals( + new HashBasedNumberedShardSpec( + 0, + shardSpecsPerInterval.size(), + 0, + shardSpecsPerInterval.size(), + ImmutableList.of("dim"), + null, + new ObjectMapper() + ), + shardSpecsPerInterval.get(0).getActualSpec() + ); + } + } +} diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java index 375151e79c7..c9937e0d98d 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -36,6 +36,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.apache.druid.timeline.partition.NoneShardSpec; import org.junit.Assert; import org.junit.Test; @@ -62,10 +63,20 @@ public class HadoopDruidIndexerConfigTest List shardSpecs = new ArrayList<>(); final int partitionCount = 10; for (int i = 0; i < partitionCount; i++) { - shardSpecs.add(new HadoopyShardSpec( - new HashBasedNumberedShardSpec(i, partitionCount, i, partitionCount, null, new DefaultObjectMapper()), - i - )); + shardSpecs.add( + new HadoopyShardSpec( + new HashBasedNumberedShardSpec( + i, + partitionCount, + i, + partitionCount, + null, + HashPartitionFunction.MURMUR3_32_ABS, + new DefaultObjectMapper() + ), + i + ) + ); } HadoopIngestionSpec spec = new HadoopIngestionSpecBuilder() diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index b804db36747..cd93236f121 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -43,6 +43,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.SingleDimensionShardSpec; @@ -559,14 +560,17 @@ public class IndexGeneratorJobTest List specs = new ArrayList<>(); if ("hashed".equals(partitionType)) { for (Integer[] shardInfo : (Integer[][]) shardInfoForEachShard) { - specs.add(new HashBasedNumberedShardSpec( - shardInfo[0], - shardInfo[1], - shardInfo[0], - shardInfo[1], - null, - HadoopDruidIndexerConfig.JSON_MAPPER - )); + specs.add( + new HashBasedNumberedShardSpec( + shardInfo[0], + shardInfo[1], + shardInfo[0], + shardInfo[1], + null, + HashPartitionFunction.MURMUR3_32_ABS, + HadoopDruidIndexerConfig.JSON_MAPPER + ) + ); } } else if ("single".equals(partitionType)) { int partitionNum = 0; diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java index 2195ab2064d..9c614247f46 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Multimap; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Test; @@ -37,6 +38,16 @@ public class HashedPartitionsSpecTest { private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); + @Test + public void testDefaultValues() + { + final HashedPartitionsSpec defaultSpec = HashedPartitionsSpec.defaultSpec(); + Assert.assertEquals(Collections.emptyList(), defaultSpec.getPartitionDimensions()); + Assert.assertEquals(HashPartitionFunction.MURMUR3_32_ABS, defaultSpec.getPartitionFunction()); + Assert.assertNull(defaultSpec.getNumShards()); + Assert.assertEquals(PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT, defaultSpec.getMaxRowsPerSegment().intValue()); + } + @Test public void havingTargetRowsPerSegmentOnly() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java index 6ba5ad3f7d5..f6759ecd272 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java @@ -44,6 +44,7 @@ import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitioner; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -194,7 +195,7 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask Map intervalToCardinalities = new HashMap<>(); while (inputRowIterator.hasNext()) { InputRow inputRow = inputRowIterator.next(); - //noinspection ConstantConditions (null rows are filtered out by FilteringCloseableInputRowIterator + // null rows are filtered out by FilteringCloseableInputRowIterator DateTime timestamp = inputRow.getTimestamp(); //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals) Interval interval = granularitySpec.bucketInterval(timestamp).get(); @@ -202,11 +203,9 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask HllSketch hllSketch = intervalToCardinalities.computeIfAbsent( interval, - (intervalKey) -> { - return DimensionCardinalityReport.createHllSketchForReport(); - } + (intervalKey) -> DimensionCardinalityReport.createHllSketchForReport() ); - List groupKey = HashBasedNumberedShardSpec.getGroupKey( + List groupKey = HashPartitioner.extractKeys( partitionDimensions, queryGranularity.bucketStart(timestamp).getMillis(), inputRow diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/HashPartitionAnalysis.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/HashPartitionAnalysis.java index 5773f095c10..cdd213496f8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/HashPartitionAnalysis.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/HashPartitionAnalysis.java @@ -103,6 +103,7 @@ public class HashPartitionAnalysis implements CompletePartitionAnalysis segments = runTask(indexTask).rhs; + + Assert.assertEquals(1, segments.size()); + + Assert.assertEquals("test", segments.get(0).getDataSource()); + Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); + Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); + Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); + Assert.assertEquals( + HashPartitionFunction.MURMUR3_32_ABS, + ((HashBasedNumberedShardSpec) segments.get(0).getShardSpec()).getPartitionFunction() + ); } @Test @@ -520,6 +574,8 @@ public class IndexTaskTest extends IngestionTestBase Assert.assertEquals("test", segment.getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segment.getInterval()); Assert.assertEquals(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); + final HashBasedNumberedShardSpec hashBasedNumberedShardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec(); + Assert.assertEquals(HashPartitionFunction.MURMUR3_32_ABS, hashBasedNumberedShardSpec.getPartitionFunction()); final File segmentFile = segmentLoader.getSegmentFiles(segment); @@ -540,17 +596,15 @@ public class IndexTaskTest extends IngestionTestBase .map(cursor -> { final DimensionSelector selector = cursor.getColumnSelectorFactory() .makeDimensionSelector(new DefaultDimensionSpec("dim", "dim")); - try { - final int hash = HashBasedNumberedShardSpec.hash( - jsonMapper, - Collections.singletonList(selector.getObject()) - ); - cursor.advance(); - return hash; - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + final int hash = HashPartitionFunction.MURMUR3_32_ABS.hash( + HashBasedNumberedShardSpec.serializeGroupKey( + jsonMapper, + Collections.singletonList(selector.getObject()) + ), + hashBasedNumberedShardSpec.getNumBuckets() + ); + cursor.advance(); + return hash; }) .toList(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ShardSpecsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ShardSpecsTest.java index 0818605fc8e..0883634d8e3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ShardSpecsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ShardSpecsTest.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.timeline.partition.BucketNumberedShardSpec; import org.apache.druid.timeline.partition.HashBucketShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.Interval; import org.junit.Assert; @@ -52,8 +53,8 @@ public class ShardSpecsTest extends IngestionTestBase @Test public void testShardSpecSelectionWithNullPartitionDimension() { - HashBucketShardSpec spec1 = new HashBucketShardSpec(0, 2, null, jsonMapper); - HashBucketShardSpec spec2 = new HashBucketShardSpec(1, 2, null, jsonMapper); + HashBucketShardSpec spec1 = new HashBucketShardSpec(0, 2, null, HashPartitionFunction.MURMUR3_32_ABS, jsonMapper); + HashBucketShardSpec spec2 = new HashBucketShardSpec(1, 2, null, HashPartitionFunction.MURMUR3_32_ABS, jsonMapper); Map>> shardSpecMap = new HashMap<>(); shardSpecMap.put(Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"), ImmutableList.of(spec1, spec2)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java index ffeab43f60d..276b9addd8f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java @@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.segment.TestHelper; import org.apache.druid.timeline.partition.HashBucketShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -46,6 +47,7 @@ public class GenericPartitionStatTest ParallelIndexTestingFactory.PARTITION_ID, ParallelIndexTestingFactory.PARTITION_ID + 1, Collections.singletonList("dim"), + HashPartitionFunction.MURMUR3_32_ABS, new ObjectMapper() ), ParallelIndexTestingFactory.NUM_ROWS, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java index 8bc6c14daa9..26895844426 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Interval; import org.junit.Assert; @@ -156,6 +157,17 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh assertHashedPartition(publishedSegments, expectedSegmentCount); } + @Test + public void testRunWithHashPartitionFunction() throws Exception + { + final Set publishedSegments = runTestTask( + new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2"), HashPartitionFunction.MURMUR3_32_ABS), + TaskState.SUCCESS, + false + ); + assertHashedPartition(publishedSegments, 2); + } + @Test public void testAppendLinearlyPartitionedSegmensToHashPartitionedDatasourceSuccessfullyAppend() { @@ -259,12 +271,27 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh for (List segmentsInInterval : intervalToSegments.values()) { Assert.assertEquals(expectedNumSegments, segmentsInInterval.size()); for (DataSegment segment : segmentsInInterval) { + Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); + final HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec(); + Assert.assertEquals(HashPartitionFunction.MURMUR3_32_ABS, shardSpec.getPartitionFunction()); List results = querySegment(segment, ImmutableList.of("dim1", "dim2"), tempSegmentDir); - final int hash = HashBasedNumberedShardSpec.hash(getObjectMapper(), (List) results.get(0).getEvents()); + final int hash = shardSpec.getPartitionFunction().hash( + HashBasedNumberedShardSpec.serializeGroupKey( + getObjectMapper(), + (List) results.get(0).getEvents() + ), + shardSpec.getNumBuckets() + ); for (ScanResultValue value : results) { Assert.assertEquals( hash, - HashBasedNumberedShardSpec.hash(getObjectMapper(), (List) value.getEvents()) + shardSpec.getPartitionFunction().hash( + HashBasedNumberedShardSpec.serializeGroupKey( + getObjectMapper(), + (List) value.getEvents() + ), + shardSpec.getNumBuckets() + ) ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 19fdc008eb4..60c286a4b29 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Interval; @@ -132,7 +133,14 @@ public class ParallelIndexSupervisorTaskTest false, "subTaskId", createInterval(id), - new BuildingHashBasedNumberedShardSpec(id, id, id + 1, null, new ObjectMapper()) + new BuildingHashBasedNumberedShardSpec( + id, + id, + id + 1, + null, + HashPartitionFunction.MURMUR3_32_ABS, + new ObjectMapper() + ) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java index de7ee8b6cc4..931fd24aeb1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java @@ -45,6 +45,7 @@ import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.easymock.EasyMock; import org.joda.time.Duration; import org.joda.time.Interval; @@ -105,6 +106,7 @@ class ParallelIndexTestingFactory PARTITION_ID, PARTITION_ID + 1, Collections.singletonList("dim"), + HashPartitionFunction.MURMUR3_32_ABS, ParallelIndexTestingFactory.NESTED_OBJECT_MAPPER ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java index 98912b402f7..af4cdf31107 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java @@ -88,6 +88,7 @@ public class PerfectRollupWorkerTaskTest 1, null, null, + null, null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index ae8fc3b64fe..6cb19edd3a1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -971,8 +971,8 @@ public class TaskLockboxTest final Task task = NoopTask.create(); lockbox.add(task); - allocateSegmentsAndAssert(task, "seq", 3, new HashBasedNumberedPartialShardSpec(null, 1, 3)); - allocateSegmentsAndAssert(task, "seq2", 5, new HashBasedNumberedPartialShardSpec(null, 3, 5)); + allocateSegmentsAndAssert(task, "seq", 3, new HashBasedNumberedPartialShardSpec(null, 1, 3, null)); + allocateSegmentsAndAssert(task, "seq2", 5, new HashBasedNumberedPartialShardSpec(null, 3, 5, null)); } private void allocateSegmentsAndAssert( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java index 23b8ac04047..898c7d58845 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.tests.TestNGGroup; import org.apache.druid.tests.indexer.AbstractITBatchIndexTest; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -74,6 +75,7 @@ public class ITHadoopIndexTest extends AbstractITBatchIndexTest {new HashedPartitionsSpec(3, null, null)}, {new HashedPartitionsSpec(null, 3, ImmutableList.of("page"))}, {new HashedPartitionsSpec(null, 3, ImmutableList.of("page", "user"))}, + {new HashedPartitionsSpec(null, 3, ImmutableList.of("page"), HashPartitionFunction.MURMUR3_32_ABS)}, {new SingleDimensionPartitionsSpec(1000, null, null, false)}, {new SingleDimensionPartitionsSpec(1000, null, "page", false)}, {new SingleDimensionPartitionsSpec(1000, null, null, true)}, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java index c49d5a90f62..fef886dda2f 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java @@ -26,6 +26,7 @@ import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; @@ -52,7 +53,8 @@ public class ITPerfectRollupParallelIndexTest extends AbstractITBatchIndexTest public static Object[][] resources() { return new Object[][]{ - {new HashedPartitionsSpec(null, 2, null)}, + {new HashedPartitionsSpec(null, 2, null, HashPartitionFunction.MURMUR3_32_ABS)}, + {new HashedPartitionsSpec(null, 2, null, null)}, {new SingleDimensionPartitionsSpec(2, null, "namespace", false)} }; } diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index b4af6a5f08e..5737e78dece 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -57,6 +57,7 @@ public class QueryContexts public static final String NUM_RETRIES_ON_MISSING_SEGMENTS_KEY = "numRetriesOnMissingSegments"; public static final String RETURN_PARTIAL_RESULTS_KEY = "returnPartialResults"; public static final String USE_CACHE_KEY = "useCache"; + public static final String SECONDARY_PARTITION_PRUNING_KEY = "secondaryPartitionPruning"; public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; @@ -74,6 +75,7 @@ public class QueryContexts public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false; public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE = 10000; public static final boolean DEFAULT_USE_FILTER_CNF = false; + public static final boolean DEFAULT_SECONDARY_PARTITION_PRUNING = true; @SuppressWarnings("unused") // Used by Jackson serialization public enum Vectorize @@ -278,6 +280,10 @@ public class QueryContexts return parseBoolean(query, JOIN_FILTER_REWRITE_ENABLE_KEY, DEFAULT_ENABLE_JOIN_FILTER_REWRITE); } + public static boolean isSecondaryPartitionPruningEnabled(Query query) + { + return parseBoolean(query, SECONDARY_PARTITION_PRUNING_KEY, DEFAULT_SECONDARY_PARTITION_PRUNING); + } public static Query withMaxScatterGatherBytes(Query query, long maxScatterGatherBytesLimit) { diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java index 49df451fb1e..41ffba39a2b 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java @@ -107,4 +107,28 @@ public class QueryContextsTest QueryContexts.withMaxScatterGatherBytes(query, 100); } + + @Test + public void testDisableSegmentPruning() + { + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), + false, + ImmutableMap.of(QueryContexts.SECONDARY_PARTITION_PRUNING_KEY, false) + ); + Assert.assertFalse(QueryContexts.isSecondaryPartitionPruningEnabled(query)); + } + + @Test + public void testDefaultSegmentPruning() + { + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), + false, + ImmutableMap.of() + ); + Assert.assertTrue(QueryContexts.isSecondaryPartitionPruningEnabled(query)); + } } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 8dba7de3d3d..119d2f99e51 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -27,6 +27,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.RangeSet; +import com.google.common.collect.Sets; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.inject.Inject; @@ -423,12 +424,17 @@ public class CachingClusteredClient implements QuerySegmentWalker final Map>> dimensionRangeCache = new HashMap<>(); // Filter unneeded chunks based on partition dimension for (TimelineObjectHolder holder : serversLookup) { - final Set> filteredChunks = DimFilterUtils.filterShards( - query.getFilter(), - holder.getObject(), - partitionChunk -> partitionChunk.getObject().getSegment().getShardSpec(), - dimensionRangeCache - ); + final Set> filteredChunks; + if (QueryContexts.isSecondaryPartitionPruningEnabled(query)) { + filteredChunks = DimFilterUtils.filterShards( + query.getFilter(), + holder.getObject(), + partitionChunk -> partitionChunk.getObject().getSegment().getShardSpec(), + dimensionRangeCache + ); + } else { + filteredChunks = Sets.newHashSet(holder.getObject()); + } for (PartitionChunk chunk : filteredChunks) { ServerSelector server = chunk.getObject(); final SegmentDescriptor segment = new SegmentDescriptor( diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 177a5a01e2e..73ccdf4cd5f 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -72,6 +72,7 @@ import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChestWarehouse; @@ -106,6 +107,7 @@ import org.apache.druid.query.search.SearchQueryQueryToolChest; import org.apache.druid.query.search.SearchResultValue; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.query.timeboundary.TimeBoundaryQuery; import org.apache.druid.query.timeboundary.TimeBoundaryResultValue; import org.apache.druid.query.timeseries.TimeseriesQuery; @@ -127,6 +129,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NumberedPartitionChunk; import org.apache.druid.timeline.partition.ShardSpec; @@ -152,17 +155,21 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; +import java.util.stream.IntStream; /** * @@ -1544,7 +1551,7 @@ public class CachingClusteredClientTest } @Test - public void testHashBasedPruning() + public void testHashBasedPruningQueryContextEnabledWithPartitionFunctionAndPartitionDimensionsDoSegmentPruning() { DimFilter filter = new AndDimFilter( new SelectorDimFilter("dim1", "a", null), @@ -1577,23 +1584,101 @@ public class CachingClusteredClientTest final DruidServer lastServer = servers[random.nextInt(servers.length)]; List partitionDimensions1 = ImmutableList.of("dim1"); - ServerSelector selector1 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 0, 6); - ServerSelector selector2 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 1, 6); - ServerSelector selector3 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 2, 6); - ServerSelector selector4 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 3, 6); - ServerSelector selector5 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 4, 6); - ServerSelector selector6 = makeMockHashBasedSelector(lastServer, partitionDimensions1, 5, 6); + ServerSelector selector1 = makeMockHashBasedSelector( + lastServer, + partitionDimensions1, + HashPartitionFunction.MURMUR3_32_ABS, + 0, + 6 + ); + ServerSelector selector2 = makeMockHashBasedSelector( + lastServer, + partitionDimensions1, + HashPartitionFunction.MURMUR3_32_ABS, + 1, + 6 + ); + ServerSelector selector3 = makeMockHashBasedSelector( + lastServer, + partitionDimensions1, + HashPartitionFunction.MURMUR3_32_ABS, + 2, + 6 + ); + ServerSelector selector4 = makeMockHashBasedSelector( + lastServer, + partitionDimensions1, + HashPartitionFunction.MURMUR3_32_ABS, + 3, + 6 + ); + ServerSelector selector5 = makeMockHashBasedSelector( + lastServer, + partitionDimensions1, + HashPartitionFunction.MURMUR3_32_ABS, + 4, + 6 + ); + ServerSelector selector6 = makeMockHashBasedSelector( + lastServer, + partitionDimensions1, + HashPartitionFunction.MURMUR3_32_ABS, + 5, + 6 + ); List partitionDimensions2 = ImmutableList.of("dim2"); - ServerSelector selector7 = makeMockHashBasedSelector(lastServer, partitionDimensions2, 0, 3); - ServerSelector selector8 = makeMockHashBasedSelector(lastServer, partitionDimensions2, 1, 3); - ServerSelector selector9 = makeMockHashBasedSelector(lastServer, partitionDimensions2, 2, 3); + ServerSelector selector7 = makeMockHashBasedSelector( + lastServer, + partitionDimensions2, + HashPartitionFunction.MURMUR3_32_ABS, + 0, + 3 + ); + ServerSelector selector8 = makeMockHashBasedSelector( + lastServer, + partitionDimensions2, + HashPartitionFunction.MURMUR3_32_ABS, + 1, + 3 + ); + ServerSelector selector9 = makeMockHashBasedSelector( + lastServer, + partitionDimensions2, + HashPartitionFunction.MURMUR3_32_ABS, + 2, + 3 + ); List partitionDimensions3 = ImmutableList.of("dim1", "dim3"); - ServerSelector selector10 = makeMockHashBasedSelector(lastServer, partitionDimensions3, 0, 4); - ServerSelector selector11 = makeMockHashBasedSelector(lastServer, partitionDimensions3, 1, 4); - ServerSelector selector12 = makeMockHashBasedSelector(lastServer, partitionDimensions3, 2, 4); - ServerSelector selector13 = makeMockHashBasedSelector(lastServer, partitionDimensions3, 3, 4); + ServerSelector selector10 = makeMockHashBasedSelector( + lastServer, + partitionDimensions3, + HashPartitionFunction.MURMUR3_32_ABS, + 0, + 4 + ); + ServerSelector selector11 = makeMockHashBasedSelector( + lastServer, + partitionDimensions3, + HashPartitionFunction.MURMUR3_32_ABS, + 1, + 4 + ); + ServerSelector selector12 = makeMockHashBasedSelector( + lastServer, + partitionDimensions3, + HashPartitionFunction.MURMUR3_32_ABS, + 2, + 4 + ); + ServerSelector selector13 = makeMockHashBasedSelector( + lastServer, + partitionDimensions3, + HashPartitionFunction.MURMUR3_32_ABS, + 3, + 4 + ); timeline.add(interval1, "v", new NumberedPartitionChunk<>(0, 6, selector1)); timeline.add(interval1, "v", new NumberedPartitionChunk<>(1, 6, selector2)); @@ -1640,9 +1725,133 @@ public class CachingClusteredClientTest Assert.assertEquals(expected, ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec()); } + @Test + public void testHashBasedPruningQueryContextDisabledNoSegmentPruning() + { + testNoSegmentPruningForHashPartitionedSegments(false, HashPartitionFunction.MURMUR3_32_ABS, false); + } + + @Test + public void testHashBasedPruningWithoutPartitionFunctionNoSegmentPruning() + { + testNoSegmentPruningForHashPartitionedSegments(true, null, false); + } + + @Test + public void testHashBasedPruningWithEmptyPartitionDimensionsNoSegmentPruning() + { + testNoSegmentPruningForHashPartitionedSegments(true, HashPartitionFunction.MURMUR3_32_ABS, true); + } + + private void testNoSegmentPruningForHashPartitionedSegments( + boolean enableSegmentPruning, + @Nullable HashPartitionFunction partitionFunction, + boolean useEmptyPartitionDimensions + ) + { + DimFilter filter = new AndDimFilter( + new SelectorDimFilter("dim1", "a", null), + new BoundDimFilter("dim2", "e", "zzz", true, true, false, null, StringComparators.LEXICOGRAPHIC), + // Equivalent filter of dim3 below is InDimFilter("dim3", Arrays.asList("c"), null) + new AndDimFilter( + new InDimFilter("dim3", Arrays.asList("a", "c", "e", "g"), null), + new BoundDimFilter("dim3", "aaa", "ddd", false, false, false, null, StringComparators.LEXICOGRAPHIC) + ) + ); + + final Map context = new HashMap<>(CONTEXT); + context.put(QueryContexts.SECONDARY_PARTITION_PRUNING_KEY, enableSegmentPruning); + final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() + .dataSource(DATA_SOURCE) + .filters(filter) + .granularity(GRANULARITY) + .intervals(SEG_SPEC) + .intervals("2011-01-05/2011-01-10") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .context(context) + .randomQueryId(); + + TimeseriesQuery query = builder.build(); + + QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); + + final Interval interval1 = Intervals.of("2011-01-06/2011-01-07"); + final Interval interval2 = Intervals.of("2011-01-07/2011-01-08"); + final Interval interval3 = Intervals.of("2011-01-08/2011-01-09"); + + final DruidServer lastServer = servers[random.nextInt(servers.length)]; + List partitionDimensions = useEmptyPartitionDimensions ? ImmutableList.of() : ImmutableList.of("dim1"); + final int numPartitions1 = 6; + for (int i = 0; i < numPartitions1; i++) { + ServerSelector selector = makeMockHashBasedSelector( + lastServer, + partitionDimensions, + partitionFunction, + i, + numPartitions1 + ); + timeline.add(interval1, "v", new NumberedPartitionChunk<>(i, numPartitions1, selector)); + } + + partitionDimensions = useEmptyPartitionDimensions ? ImmutableList.of() : ImmutableList.of("dim2"); + final int numPartitions2 = 3; + for (int i = 0; i < numPartitions2; i++) { + ServerSelector selector = makeMockHashBasedSelector( + lastServer, + partitionDimensions, + partitionFunction, + i, + numPartitions2 + ); + timeline.add(interval2, "v", new NumberedPartitionChunk<>(i, numPartitions2, selector)); + } + + partitionDimensions = useEmptyPartitionDimensions ? ImmutableList.of() : ImmutableList.of("dim1", "dim3"); + final int numPartitions3 = 4; + for (int i = 0; i < numPartitions3; i++) { + ServerSelector selector = makeMockHashBasedSelector( + lastServer, + partitionDimensions, + partitionFunction, + i, + numPartitions3 + ); + timeline.add(interval3, "v", new NumberedPartitionChunk<>(i, numPartitions3, selector)); + } + + final Capture capture = Capture.newInstance(); + final Capture contextCap = Capture.newInstance(); + + QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class); + EasyMock.expect(mockRunner.run(EasyMock.capture(capture), EasyMock.capture(contextCap))) + .andReturn(Sequences.empty()) + .anyTimes(); + EasyMock.expect(serverView.getQueryRunner(lastServer)) + .andReturn(mockRunner) + .anyTimes(); + EasyMock.replay(serverView); + EasyMock.replay(mockRunner); + + // Expected to read all segments + Set expcetedDescriptors = new HashSet<>(); + IntStream.range(0, numPartitions1).forEach(i -> expcetedDescriptors.add(new SegmentDescriptor(interval1, "v", i))); + IntStream.range(0, numPartitions2).forEach(i -> expcetedDescriptors.add(new SegmentDescriptor(interval2, "v", i))); + IntStream.range(0, numPartitions3).forEach(i -> expcetedDescriptors.add(new SegmentDescriptor(interval3, "v", i))); + + runner.run(QueryPlus.wrap(query)).toList(); + QuerySegmentSpec querySegmentSpec = ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec(); + Assert.assertSame(MultipleSpecificSegmentSpec.class, querySegmentSpec.getClass()); + final Set actualDescriptors = new HashSet<>( + ((MultipleSpecificSegmentSpec) querySegmentSpec).getDescriptors() + ); + Assert.assertEquals(expcetedDescriptors, actualDescriptors); + } + private ServerSelector makeMockHashBasedSelector( DruidServer server, List partitionDimensions, + @Nullable HashPartitionFunction partitionFunction, int partitionNum, int partitions ) @@ -1658,6 +1867,7 @@ public class CachingClusteredClientTest partitionNum, partitions, partitionDimensions, + partitionFunction, ServerTestHelper.MAPPER ), null, diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index ebe82a0b140..2a7b5c4d473 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -1079,7 +1079,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() throws IOException { - final PartialShardSpec partialShardSpec = new HashBasedNumberedPartialShardSpec(null, 2, 5); + final PartialShardSpec partialShardSpec = new HashBasedNumberedPartialShardSpec(null, 2, 5, null); final String dataSource = "ds"; final Interval interval = Intervals.of("2017-01-01/2017-02-01"); @@ -1150,7 +1150,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest "seq3", null, interval, - new HashBasedNumberedPartialShardSpec(null, 2, 3), + new HashBasedNumberedPartialShardSpec(null, 2, 3, null), "version", true ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java index 1f2af729880..4ace675e68e 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java @@ -29,6 +29,7 @@ import org.apache.druid.timeline.partition.BuildingNumberedShardSpec; import org.apache.druid.timeline.partition.BuildingSingleDimensionShardSpec; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.HashBucketShardSpec; +import org.apache.druid.timeline.partition.HashPartitionFunction; import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionIds; @@ -103,9 +104,36 @@ public class SegmentPublisherHelperTest public void testAnnotateCorePartitionSetSizeForHashNumberedShardSpec() { final Set segments = ImmutableSet.of( - newSegment(new BuildingHashBasedNumberedShardSpec(0, 0, 3, null, new ObjectMapper())), - newSegment(new BuildingHashBasedNumberedShardSpec(1, 1, 3, null, new ObjectMapper())), - newSegment(new BuildingHashBasedNumberedShardSpec(2, 2, 3, null, new ObjectMapper())) + newSegment( + new BuildingHashBasedNumberedShardSpec( + 0, + 0, + 3, + null, + HashPartitionFunction.MURMUR3_32_ABS, + new ObjectMapper() + ) + ), + newSegment( + new BuildingHashBasedNumberedShardSpec( + 1, + 1, + 3, + null, + HashPartitionFunction.MURMUR3_32_ABS, + new ObjectMapper() + ) + ), + newSegment( + new BuildingHashBasedNumberedShardSpec( + 2, + 2, + 3, + null, + HashPartitionFunction.MURMUR3_32_ABS, + new ObjectMapper() + ) + ) ); final Set annotated = SegmentPublisherHelper.annotateShardSpec(segments); for (DataSegment segment : annotated) { @@ -147,9 +175,9 @@ public class SegmentPublisherHelperTest public void testAnnotateShardSpecThrowingExceptionForBucketNumberedShardSpec() { final Set segments = ImmutableSet.of( - newSegment(new HashBucketShardSpec(0, 3, null, new ObjectMapper())), - newSegment(new HashBucketShardSpec(1, 3, null, new ObjectMapper())), - newSegment(new HashBucketShardSpec(2, 3, null, new ObjectMapper())) + newSegment(new HashBucketShardSpec(0, 3, null, HashPartitionFunction.MURMUR3_32_ABS, new ObjectMapper())), + newSegment(new HashBucketShardSpec(1, 3, null, HashPartitionFunction.MURMUR3_32_ABS, new ObjectMapper())), + newSegment(new HashBucketShardSpec(2, 3, null, HashPartitionFunction.MURMUR3_32_ABS, new ObjectMapper())) ); expectedException.expect(IllegalStateException.class); expectedException.expectMessage("Cannot publish segments with shardSpec"); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 20512316840..8d9b2943b60 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -121,6 +121,7 @@ public class CompactSegmentsTest bucketId, numBuckets, ImmutableList.of("dim"), + null, JSON_MAPPER ) }, diff --git a/website/.spelling b/website/.spelling index 9c3f1a0db65..c2cea43119a 100644 --- a/website/.spelling +++ b/website/.spelling @@ -306,6 +306,7 @@ numerics parameterized parseable partitioner +partitionFunction partitionsSpec performant plaintext @@ -361,6 +362,7 @@ rsync runtime schemas searchable +secondaryPartitionPruning seekable-stream servlet simple-client-sslcontext