From bce600f5d5073d9a077b3ea53cb3fd4e4d1ead58 Mon Sep 17 00:00:00 2001 From: binlijin Date: Tue, 22 Mar 2016 13:15:33 +0800 Subject: [PATCH] Single dimension hash-based partitioning --- docs/content/ingestion/batch-ingestion.md | 1 + .../indexer/DetermineHashedPartitionsJob.java | 1 + .../HadoopDruidDetermineConfigurationJob.java | 7 ++- .../partitions/HashedPartitionsSpec.java | 21 ++++++- .../indexer/partitions/PartitionsSpec.java | 4 ++ .../SingleDimensionPartitionsSpec.java | 9 +++ .../indexer/BatchDeltaIngestionTest.java | 2 +- .../DetermineHashedPartitionsJobTest.java | 2 +- .../indexer/HadoopDruidIndexerConfigTest.java | 2 +- .../druid/indexer/IndexGeneratorJobTest.java | 2 +- .../partitions/HashedPartitionsSpecTest.java | 13 +++++ .../druid/indexing/common/task/IndexTask.java | 4 +- .../partition/HashBasedNumberedShardSpec.java | 36 +++++++++++- .../HashBasedNumberedShardSpecTest.java | 55 ++++++++++++++++--- 14 files changed, 140 insertions(+), 19 deletions(-) rename server/src/test/java/io/druid/{server/shard => timeline/partition}/HashBasedNumberedShardSpecTest.java (71%) diff --git a/docs/content/ingestion/batch-ingestion.md b/docs/content/ingestion/batch-ingestion.md index 0d0a723a3d1..d6da72f3ec0 100644 --- a/docs/content/ingestion/batch-ingestion.md +++ b/docs/content/ingestion/batch-ingestion.md @@ -192,6 +192,7 @@ The configuration options are: |type|Type of partitionSpec to be used.|"hashed"| |targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards| |numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or targetPartitionSize| +|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with numShards, will be ignored when targetPartitionSize is set|no| #### Single-dimension partitioning diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 483aad7a062..99c568770ac 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -179,6 +179,7 @@ public class DetermineHashedPartitionsJob implements Jobby new HashBasedNumberedShardSpec( i, numberOfShards, + null, HadoopDruidIndexerConfig.JSON_MAPPER ), shardCount++ diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java index 823b573f9f4..554e64e6aa9 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -67,7 +67,12 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby for (int i = 0; i < shardsPerInterval; i++) { specs.add( new HadoopyShardSpec( - new HashBasedNumberedShardSpec(i, shardsPerInterval, HadoopDruidIndexerConfig.JSON_MAPPER), + new HashBasedNumberedShardSpec( + i, + shardsPerInterval, + config.getPartitionsSpec().getPartitionDimensions(), + HadoopDruidIndexerConfig.JSON_MAPPER + ), shardCount++ ) ); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java index 3b7ebcf4153..7b69b0097ea 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/HashedPartitionsSpec.java @@ -20,29 +20,39 @@ package io.druid.indexer.partitions; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; import io.druid.indexer.DetermineHashedPartitionsJob; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.Jobby; import javax.annotation.Nullable; +import java.util.List; public class HashedPartitionsSpec extends AbstractPartitionsSpec { + private static final List DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of(); + public static HashedPartitionsSpec makeDefaultHashedPartitionsSpec() { - return new HashedPartitionsSpec(null, null, null, null); + return new HashedPartitionsSpec(null, null, null, null, null); } + @JsonIgnore + private final List partitionDimensions; + @JsonCreator public HashedPartitionsSpec( @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped, - @JsonProperty("numShards") @Nullable Integer numShards + @JsonProperty("numShards") @Nullable Integer numShards, + @JsonProperty("partitionDimensions") @Nullable List partitionDimensions ) { super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards); + this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions; } @Override @@ -50,4 +60,11 @@ public class HashedPartitionsSpec extends AbstractPartitionsSpec { return new DetermineHashedPartitionsJob(config); } + + @Override + @JsonProperty + public List getPartitionDimensions() + { + return partitionDimensions; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java index fb232d58e41..099b83213a8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java @@ -26,6 +26,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.Jobby; +import java.util.List; + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class), @@ -51,4 +53,6 @@ public interface PartitionsSpec @JsonProperty public int getNumShards(); + @JsonProperty + public List getPartitionDimensions(); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java index 419f37a3af9..7441abc481c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java @@ -22,11 +22,13 @@ package io.druid.indexer.partitions; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; import io.druid.indexer.DeterminePartitionsJob; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.Jobby; import javax.annotation.Nullable; +import java.util.List; public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec { @@ -57,4 +59,11 @@ public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec { return new DeterminePartitionsJob(config); } + + @Override + @JsonProperty + public List getPartitionDimensions() + { + return ImmutableList.of(); + } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 30122001791..0cdb613a761 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -393,7 +393,7 @@ public class BatchDeltaIngestionTest INTERVAL_FULL.getStart(), ImmutableList.of( new HadoopyShardSpec( - new HashBasedNumberedShardSpec(0, 1, HadoopDruidIndexerConfig.JSON_MAPPER), + new HashBasedNumberedShardSpec(0, 1, null, HadoopDruidIndexerConfig.JSON_MAPPER), 0 ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index 51696be32f5..a1f3615ed39 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -149,7 +149,7 @@ public class DetermineHashedPartitionsJobTest new HadoopTuningConfig( tmpDir.getAbsolutePath(), null, - new HashedPartitionsSpec(targetPartitionSize, null, true, null), + new HashedPartitionsSpec(targetPartitionSize, null, true, null, null), null, null, null, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index 10abc170dea..3e9bfdd5c77 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -193,7 +193,7 @@ public class HadoopDruidIndexerConfigTest List specs = Lists.newArrayList(); final int partitionCount = 10; for (int i = 0; i < partitionCount; i++) { - specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, new DefaultObjectMapper()), i)); + specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, null, new DefaultObjectMapper()), i)); } HadoopIngestionSpec spec = new HadoopIngestionSpec( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index e1a5238d139..9e33be0ef7d 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -517,7 +517,7 @@ public class IndexGeneratorJobTest List specs = Lists.newArrayList(); if (partitionType.equals("hashed")) { for (Integer[] shardInfo : (Integer[][]) shardInfoForEachShard) { - specs.add(new HashBasedNumberedShardSpec(shardInfo[0], shardInfo[1], HadoopDruidIndexerConfig.JSON_MAPPER)); + specs.add(new HashBasedNumberedShardSpec(shardInfo[0], shardInfo[1], null, HadoopDruidIndexerConfig.JSON_MAPPER)); } } else if (partitionType.equals("single")) { int partitionNum = 0; diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/partitions/HashedPartitionsSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/partitions/HashedPartitionsSpecTest.java index ff96216b5e0..59cc90e0d49 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/partitions/HashedPartitionsSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/partitions/HashedPartitionsSpecTest.java @@ -21,6 +21,7 @@ package io.druid.indexer.partitions; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import io.druid.jackson.DefaultObjectMapper; import org.junit.Assert; import org.junit.Test; @@ -68,6 +69,12 @@ public class HashedPartitionsSpecTest 150 ); + Assert.assertEquals( + "getPartitionDimensions", + partitionsSpec.getPartitionDimensions(), + ImmutableList.of() + ); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); } } @@ -114,6 +121,12 @@ public class HashedPartitionsSpecTest 2 ); + Assert.assertEquals( + "getPartitionDimensions", + partitionsSpec.getPartitionDimensions(), + ImmutableList.of() + ); + Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index eb56b37322c..c0a5d200bfd 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -211,7 +211,7 @@ public class IndexTask extends AbstractFixedIntervalTask if (numShards > 0) { shardSpecs = Lists.newArrayList(); for (int i = 0; i < numShards; i++) { - shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, jsonMapper)); + shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, null, jsonMapper)); } } else { shardSpecs = ImmutableList.of(new NoneShardSpec()); @@ -304,7 +304,7 @@ public class IndexTask extends AbstractFixedIntervalTask shardSpecs.add(new NoneShardSpec()); } else { for (int i = 0; i < numberOfShards; ++i) { - shardSpecs.add(new HashBasedNumberedShardSpec(i, numberOfShards, jsonMapper)); + shardSpecs.add(new HashBasedNumberedShardSpec(i, numberOfShards, null, jsonMapper)); } } diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index 9095915ef86..49b55de0a2c 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -21,31 +21,48 @@ package io.druid.timeline.partition; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; +import javax.annotation.Nullable; import java.util.List; public class HashBasedNumberedShardSpec extends NumberedShardSpec { private static final HashFunction hashFunction = Hashing.murmur3_32(); + private static final List DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of(); + private final ObjectMapper jsonMapper; + @JsonIgnore + private final List partitionDimensions; @JsonCreator public HashBasedNumberedShardSpec( @JsonProperty("partitionNum") int partitionNum, @JsonProperty("partitions") int partitions, + @JsonProperty("partitionDimensions") @Nullable List partitionDimensions, @JacksonInject ObjectMapper jsonMapper ) { super(partitionNum, partitions); this.jsonMapper = jsonMapper; + this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions; + } + + @JsonProperty("partitionDimensions") + public List getPartitionDimensions() + { + return partitionDimensions; } @Override @@ -56,7 +73,7 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec protected int hash(long timestamp, InputRow inputRow) { - final List groupKey = Rows.toGroupKey(timestamp, inputRow); + final List groupKey = getGroupKey(timestamp, inputRow); try { return hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asInt(); } @@ -65,12 +82,29 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec } } + List getGroupKey(final long timestamp, final InputRow inputRow) + { + if (partitionDimensions.isEmpty()) { + return Rows.toGroupKey(timestamp, inputRow); + } else { + return Lists.transform(partitionDimensions, new Function() + { + @Override + public Object apply(final String dim) + { + return inputRow.getDimension(dim); + } + }); + } + } + @Override public String toString() { return "HashBasedNumberedShardSpec{" + "partitionNum=" + getPartitionNum() + ", partitions=" + getPartitions() + + ", partitionDimensions=" + getPartitionDimensions() + '}'; } diff --git a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java b/server/src/test/java/io/druid/timeline/partition/HashBasedNumberedShardSpecTest.java similarity index 71% rename from server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java rename to server/src/test/java/io/druid/timeline/partition/HashBasedNumberedShardSpecTest.java index 2b15b58112a..4466f7ef737 100644 --- a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java +++ b/server/src/test/java/io/druid/timeline/partition/HashBasedNumberedShardSpecTest.java @@ -17,18 +17,17 @@ * under the License. */ -package io.druid.server.shard; +package io.druid.timeline.partition; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.common.ISE; import io.druid.TestUtil; import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Row; -import io.druid.timeline.partition.HashBasedNumberedShardSpec; -import io.druid.timeline.partition.PartitionChunk; -import io.druid.timeline.partition.ShardSpec; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; @@ -43,11 +42,12 @@ public class HashBasedNumberedShardSpecTest { final ShardSpec spec = TestUtil.MAPPER.readValue( - TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2, TestUtil.MAPPER)), + TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2, ImmutableList.of("visitor_id"), TestUtil.MAPPER)), ShardSpec.class ); Assert.assertEquals(1, spec.getPartitionNum()); Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getPartitions()); + Assert.assertEquals(ImmutableList.of("visitor_id"), ((HashBasedNumberedShardSpec) spec).getPartitionDimensions()); } @Test @@ -59,15 +59,23 @@ public class HashBasedNumberedShardSpecTest ); Assert.assertEquals(1, spec.getPartitionNum()); Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getPartitions()); + + final ShardSpec specWithPartitionDimensions = TestUtil.MAPPER.readValue( + "{\"type\": \"hashed\", \"partitions\": 2, \"partitionNum\": 1, \"partitionDimensions\":[\"visitor_id\"]}", + ShardSpec.class + ); + Assert.assertEquals(1, specWithPartitionDimensions.getPartitionNum()); + Assert.assertEquals(2, ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getPartitions()); + Assert.assertEquals(ImmutableList.of("visitor_id"), ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getPartitionDimensions()); } @Test public void testPartitionChunks() { final List specs = ImmutableList.of( - new HashBasedNumberedShardSpec(0, 3, TestUtil.MAPPER), - new HashBasedNumberedShardSpec(1, 3, TestUtil.MAPPER), - new HashBasedNumberedShardSpec(2, 3, TestUtil.MAPPER) + new HashBasedNumberedShardSpec(0, 3, null, TestUtil.MAPPER), + new HashBasedNumberedShardSpec(1, 3, null, TestUtil.MAPPER), + new HashBasedNumberedShardSpec(2, 3, null, TestUtil.MAPPER) ); final List> chunks = Lists.transform( @@ -124,6 +132,35 @@ public class HashBasedNumberedShardSpecTest } + @Test + public void testGetGroupKey() throws Exception + { + final HashBasedNumberedShardSpec shardSpec1 = new HashBasedNumberedShardSpec( + 1, + 2, + ImmutableList.of("visitor_id"), + TestUtil.MAPPER + ); + final DateTime time = new DateTime(); + final InputRow inputRow = new MapBasedInputRow( + time, + ImmutableList.of("visitor_id", "cnt"), + ImmutableMap.of("visitor_id", "v1", "cnt", 10) + ); + Assert.assertEquals(ImmutableList.of(Lists.newArrayList("v1")), shardSpec1.getGroupKey(time.getMillis(), inputRow)); + + final HashBasedNumberedShardSpec shardSpec2 = new HashBasedNumberedShardSpec(1, 2, null, TestUtil.MAPPER); + Assert.assertEquals(ImmutableList.of( + time.getMillis(), + ImmutableMap.of( + "cnt", + Lists.newArrayList(10), + "visitor_id", + Lists.newArrayList("v1") + ) + ).toString(), shardSpec2.getGroupKey(time.getMillis(), inputRow).toString()); + } + public boolean assertExistsInOneSpec(List specs, InputRow row) { for (ShardSpec spec : specs) { @@ -141,7 +178,7 @@ public class HashBasedNumberedShardSpecTest int partitions ) { - super(partitionNum, partitions, TestUtil.MAPPER); + super(partitionNum, partitions, null, TestUtil.MAPPER); } @Override