Merge pull request #2570 from binlijin/single_dimension_partitioning

Single dimension hash-based partitioning
This commit is contained in:
Himanshu 2016-03-22 11:51:06 -05:00
commit 3220b109ad
14 changed files with 140 additions and 19 deletions

View File

@ -192,6 +192,7 @@ The configuration options are:
|type|Type of partitionSpec to be used.|"hashed"|
|targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards|
|numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or targetPartitionSize|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with numShards, will be ignored when targetPartitionSize is set|no|
#### Single-dimension partitioning

View File

@ -179,6 +179,7 @@ public class DetermineHashedPartitionsJob implements Jobby
new HashBasedNumberedShardSpec(
i,
numberOfShards,
null,
HadoopDruidIndexerConfig.JSON_MAPPER
),
shardCount++

View File

@ -67,7 +67,12 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
for (int i = 0; i < shardsPerInterval; i++) {
specs.add(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(i, shardsPerInterval, HadoopDruidIndexerConfig.JSON_MAPPER),
new HashBasedNumberedShardSpec(
i,
shardsPerInterval,
config.getPartitionsSpec().getPartitionDimensions(),
HadoopDruidIndexerConfig.JSON_MAPPER
),
shardCount++
)
);

View File

@ -20,29 +20,39 @@
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.indexer.DetermineHashedPartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
import java.util.List;
public class HashedPartitionsSpec extends AbstractPartitionsSpec
{
private static final List<String> DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();
public static HashedPartitionsSpec makeDefaultHashedPartitionsSpec()
{
return new HashedPartitionsSpec(null, null, null, null);
return new HashedPartitionsSpec(null, null, null, null, null);
}
@JsonIgnore
private final List<String> partitionDimensions;
@JsonCreator
public HashedPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
@JsonProperty("numShards") @Nullable Integer numShards
@JsonProperty("numShards") @Nullable Integer numShards,
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions;
}
@Override
@ -50,4 +60,11 @@ public class HashedPartitionsSpec extends AbstractPartitionsSpec
{
return new DetermineHashedPartitionsJob(config);
}
@Override
@JsonProperty
public List<String> getPartitionDimensions()
{
return partitionDimensions;
}
}

View File

@ -26,6 +26,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import java.util.List;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class),
@ -51,4 +53,6 @@ public interface PartitionsSpec
@JsonProperty
public int getNumShards();
@JsonProperty
public List<String> getPartitionDimensions();
}

View File

@ -22,11 +22,13 @@ package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.indexer.DeterminePartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
import java.util.List;
public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec
{
@ -57,4 +59,11 @@ public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec
{
return new DeterminePartitionsJob(config);
}
@Override
@JsonProperty
public List<String> getPartitionDimensions()
{
return ImmutableList.of();
}
}

View File

@ -393,7 +393,7 @@ public class BatchDeltaIngestionTest
INTERVAL_FULL.getStart(),
ImmutableList.of(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(0, 1, HadoopDruidIndexerConfig.JSON_MAPPER),
new HashBasedNumberedShardSpec(0, 1, null, HadoopDruidIndexerConfig.JSON_MAPPER),
0
)
)

View File

@ -149,7 +149,7 @@ public class DetermineHashedPartitionsJobTest
new HadoopTuningConfig(
tmpDir.getAbsolutePath(),
null,
new HashedPartitionsSpec(targetPartitionSize, null, true, null),
new HashedPartitionsSpec(targetPartitionSize, null, true, null, null),
null,
null,
null,

View File

@ -193,7 +193,7 @@ public class HadoopDruidIndexerConfigTest
List<HadoopyShardSpec> specs = Lists.newArrayList();
final int partitionCount = 10;
for (int i = 0; i < partitionCount; i++) {
specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, new DefaultObjectMapper()), i));
specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, null, new DefaultObjectMapper()), i));
}
HadoopIngestionSpec spec = new HadoopIngestionSpec(

View File

@ -517,7 +517,7 @@ public class IndexGeneratorJobTest
List<ShardSpec> specs = Lists.newArrayList();
if (partitionType.equals("hashed")) {
for (Integer[] shardInfo : (Integer[][]) shardInfoForEachShard) {
specs.add(new HashBasedNumberedShardSpec(shardInfo[0], shardInfo[1], HadoopDruidIndexerConfig.JSON_MAPPER));
specs.add(new HashBasedNumberedShardSpec(shardInfo[0], shardInfo[1], null, HadoopDruidIndexerConfig.JSON_MAPPER));
}
} else if (partitionType.equals("single")) {
int partitionNum = 0;

View File

@ -21,6 +21,7 @@ package io.druid.indexer.partitions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
@ -68,6 +69,12 @@ public class HashedPartitionsSpecTest
150
);
Assert.assertEquals(
"getPartitionDimensions",
partitionsSpec.getPartitionDimensions(),
ImmutableList.of()
);
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
}
}
@ -114,6 +121,12 @@ public class HashedPartitionsSpecTest
2
);
Assert.assertEquals(
"getPartitionDimensions",
partitionsSpec.getPartitionDimensions(),
ImmutableList.of()
);
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
}

View File

@ -211,7 +211,7 @@ public class IndexTask extends AbstractFixedIntervalTask
if (numShards > 0) {
shardSpecs = Lists.newArrayList();
for (int i = 0; i < numShards; i++) {
shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, jsonMapper));
shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, null, jsonMapper));
}
} else {
shardSpecs = ImmutableList.<ShardSpec>of(new NoneShardSpec());
@ -304,7 +304,7 @@ public class IndexTask extends AbstractFixedIntervalTask
shardSpecs.add(new NoneShardSpec());
} else {
for (int i = 0; i < numberOfShards; ++i) {
shardSpecs.add(new HashBasedNumberedShardSpec(i, numberOfShards, jsonMapper));
shardSpecs.add(new HashBasedNumberedShardSpec(i, numberOfShards, null, jsonMapper));
}
}

View File

@ -21,31 +21,48 @@ package io.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import javax.annotation.Nullable;
import java.util.List;
public class HashBasedNumberedShardSpec extends NumberedShardSpec
{
private static final HashFunction hashFunction = Hashing.murmur3_32();
private static final List<String> DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();
private final ObjectMapper jsonMapper;
@JsonIgnore
private final List<String> partitionDimensions;
@JsonCreator
public HashBasedNumberedShardSpec(
@JsonProperty("partitionNum") int partitionNum,
@JsonProperty("partitions") int partitions,
@JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
@JacksonInject ObjectMapper jsonMapper
)
{
super(partitionNum, partitions);
this.jsonMapper = jsonMapper;
this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions;
}
@JsonProperty("partitionDimensions")
public List<String> getPartitionDimensions()
{
return partitionDimensions;
}
@Override
@ -56,7 +73,7 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
protected int hash(long timestamp, InputRow inputRow)
{
final List<Object> groupKey = Rows.toGroupKey(timestamp, inputRow);
final List<Object> groupKey = getGroupKey(timestamp, inputRow);
try {
return hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asInt();
}
@ -65,12 +82,29 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
}
}
List<Object> getGroupKey(final long timestamp, final InputRow inputRow)
{
if (partitionDimensions.isEmpty()) {
return Rows.toGroupKey(timestamp, inputRow);
} else {
return Lists.transform(partitionDimensions, new Function<String, Object>()
{
@Override
public Object apply(final String dim)
{
return inputRow.getDimension(dim);
}
});
}
}
@Override
public String toString()
{
return "HashBasedNumberedShardSpec{" +
"partitionNum=" + getPartitionNum() +
", partitions=" + getPartitions() +
", partitionDimensions=" + getPartitionDimensions() +
'}';
}

View File

@ -17,18 +17,17 @@
* under the License.
*/
package io.druid.server.shard;
package io.druid.timeline.partition;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import io.druid.TestUtil;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
@ -43,11 +42,12 @@ public class HashBasedNumberedShardSpecTest
{
final ShardSpec spec = TestUtil.MAPPER.readValue(
TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2, TestUtil.MAPPER)),
TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2, ImmutableList.of("visitor_id"), TestUtil.MAPPER)),
ShardSpec.class
);
Assert.assertEquals(1, spec.getPartitionNum());
Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getPartitions());
Assert.assertEquals(ImmutableList.of("visitor_id"), ((HashBasedNumberedShardSpec) spec).getPartitionDimensions());
}
@Test
@ -59,15 +59,23 @@ public class HashBasedNumberedShardSpecTest
);
Assert.assertEquals(1, spec.getPartitionNum());
Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getPartitions());
final ShardSpec specWithPartitionDimensions = TestUtil.MAPPER.readValue(
"{\"type\": \"hashed\", \"partitions\": 2, \"partitionNum\": 1, \"partitionDimensions\":[\"visitor_id\"]}",
ShardSpec.class
);
Assert.assertEquals(1, specWithPartitionDimensions.getPartitionNum());
Assert.assertEquals(2, ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getPartitions());
Assert.assertEquals(ImmutableList.of("visitor_id"), ((HashBasedNumberedShardSpec) specWithPartitionDimensions).getPartitionDimensions());
}
@Test
public void testPartitionChunks()
{
final List<ShardSpec> specs = ImmutableList.<ShardSpec>of(
new HashBasedNumberedShardSpec(0, 3, TestUtil.MAPPER),
new HashBasedNumberedShardSpec(1, 3, TestUtil.MAPPER),
new HashBasedNumberedShardSpec(2, 3, TestUtil.MAPPER)
new HashBasedNumberedShardSpec(0, 3, null, TestUtil.MAPPER),
new HashBasedNumberedShardSpec(1, 3, null, TestUtil.MAPPER),
new HashBasedNumberedShardSpec(2, 3, null, TestUtil.MAPPER)
);
final List<PartitionChunk<String>> chunks = Lists.transform(
@ -124,6 +132,35 @@ public class HashBasedNumberedShardSpecTest
}
@Test
public void testGetGroupKey() throws Exception
{
final HashBasedNumberedShardSpec shardSpec1 = new HashBasedNumberedShardSpec(
1,
2,
ImmutableList.of("visitor_id"),
TestUtil.MAPPER
);
final DateTime time = new DateTime();
final InputRow inputRow = new MapBasedInputRow(
time,
ImmutableList.of("visitor_id", "cnt"),
ImmutableMap.<String, Object>of("visitor_id", "v1", "cnt", 10)
);
Assert.assertEquals(ImmutableList.of(Lists.newArrayList("v1")), shardSpec1.getGroupKey(time.getMillis(), inputRow));
final HashBasedNumberedShardSpec shardSpec2 = new HashBasedNumberedShardSpec(1, 2, null, TestUtil.MAPPER);
Assert.assertEquals(ImmutableList.of(
time.getMillis(),
ImmutableMap.of(
"cnt",
Lists.newArrayList(10),
"visitor_id",
Lists.newArrayList("v1")
)
).toString(), shardSpec2.getGroupKey(time.getMillis(), inputRow).toString());
}
public boolean assertExistsInOneSpec(List<ShardSpec> specs, InputRow row)
{
for (ShardSpec spec : specs) {
@ -141,7 +178,7 @@ public class HashBasedNumberedShardSpecTest
int partitions
)
{
super(partitionNum, partitions, TestUtil.MAPPER);
super(partitionNum, partitions, null, TestUtil.MAPPER);
}
@Override