Merge pull request #406 from metamx/refactor-partitionsSpec

hashed partitions improvements
This commit is contained in:
fjy 2014-02-25 16:44:08 -07:00
commit ff7cc79752
6 changed files with 114 additions and 13 deletions

View File

@ -82,7 +82,7 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
"segmentOutputPath": "s3n:\/\/billy-bucket\/the\/segments\/go\/here",
"leaveIntermediate": "false",
"partitionsSpec": {
"type": "random"
"type": "hashed"
"targetPartitionSize": 5000000
},
"updaterJobSpec": {
@ -147,13 +147,15 @@ The indexing process has the ability to roll data up as it processes the incomin
### Partitioning specification
Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in some other way depending on partition type.
Druid supports two types of partitions spec - singleDimension and random.
Druid supports two types of partitions spec - singleDimension and hashed.
In SingleDimension partition type data is partitioned based on the values in that dimension.
For example, data for a day may be split by the dimension "last\_name" into two segments: one with all values from A-M and one with all values from N-Z.
In random partition type, the number of partitions is determined based on the targetPartitionSize and cardinality of input set and the data is partitioned based on the hashcode of the row.
Random partition type is more efficient and gives better distribution of data.
In hashed partition type, the number of partitions is determined based on the targetPartitionSize and cardinality of input set and the data is partitioned based on the hashcode of the row.
It is recommended to use Hashed partition as it is more efficient than singleDimension since it does not need to determine the dimension for creating partitions.
Hashing also gives better distribution of data resulting in equal sized partitons and improving query performance
To use this option, the indexer must be given a target partition size. It can then find a good set of partition ranges on its own.

View File

@ -99,7 +99,9 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputKeyClass(NullWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
if(!config.getSegmentGranularIntervals().isPresent()){
groupByJob.setNumReduceTasks(1);
}
JobHelper.setupClasspath(config, groupByJob);
config.addInputPaths(groupByJob);
@ -294,7 +296,7 @@ public class DetermineHashedPartitionsJob implements Jobby
{
HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE);
for (BytesWritable value : values) {
HyperLogLog logValue = HyperLogLog.Builder.build(value.getBytes());
HyperLogLog logValue = HyperLogLog.Builder.build(getDataBytes(value));
try {
aggregate.addAll(logValue);
}
@ -324,6 +326,13 @@ public class DetermineHashedPartitionsJob implements Jobby
}
}
private byte[] getDataBytes(BytesWritable writable)
{
byte[] rv = new byte[writable.getLength()];
System.arraycopy(writable.getBytes(), 0, rv, 0, writable.getLength());
return rv;
}
@Override
public void run(Context context)
throws IOException, InterruptedException

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexer.DetermineHashedPartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
public class HashedPartitionsSpec extends AbstractPartitionsSpec
{
@JsonCreator
public HashedPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
}
@Override
public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DetermineHashedPartitionsJob(config);
}
}

View File

@ -29,7 +29,8 @@ import io.druid.indexer.Jobby;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleDimensionPartitionsSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class),
@JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class)
@JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class),
@JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class)
})
public interface PartitionsSpec
{

View File

@ -27,7 +27,9 @@ import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
public class RandomPartitionsSpec extends AbstractPartitionsSpec
// for backward compatibility
@Deprecated
public class RandomPartitionsSpec extends HashedPartitionsSpec
{
@JsonCreator
public RandomPartitionsSpec(
@ -38,10 +40,4 @@ public class RandomPartitionsSpec extends AbstractPartitionsSpec
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
}
@Override
public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DetermineHashedPartitionsJob(config);
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.db.DbConnectorConfig;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
@ -501,6 +502,7 @@ public class HadoopDruidIndexerConfigTest
}
}
@Test
public void testRandomPartitionsSpec() throws Exception{
{
final HadoopDruidIndexerConfig cfg;
@ -543,4 +545,48 @@ public class HadoopDruidIndexerConfigTest
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec);
}
}
@Test
public void testHashedPartitionsSpec() throws Exception{
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"type\":\"hashed\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof HashedPartitionsSpec);
}
}
}