mirror of https://github.com/apache/druid.git
review comments and add partitioner
This commit is contained in:
parent
ea4a80e8d2
commit
0d8c1ffe54
|
@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.db.DbConnector;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.joda.time.DateTime;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
|
@ -75,7 +76,7 @@ public class DbUpdaterJob implements Jobby
|
|||
.put("created_date", new DateTime().toString())
|
||||
.put("start", segment.getInterval().getStart().toString())
|
||||
.put("end", segment.getInterval().getEnd().toString())
|
||||
.put("partitioned", segment.getShardSpec().getPartitionNum())
|
||||
.put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1)
|
||||
.put("version", segment.getVersion())
|
||||
.put("used", true)
|
||||
.put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment))
|
||||
|
|
|
@ -37,6 +37,7 @@ import io.druid.indexer.granularity.UniformGranularitySpec;
|
|||
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
|
||||
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -45,6 +46,7 @@ import org.apache.hadoop.io.LongWritable;
|
|||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Partitioner;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
|
@ -65,7 +67,6 @@ import java.util.Set;
|
|||
*/
|
||||
public class DetermineHashedPartitionsJob implements Jobby
|
||||
{
|
||||
private static final int MAX_SHARDS = 128;
|
||||
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
|
||||
|
@ -98,6 +99,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
groupByJob.setOutputKeyClass(NullWritable.class);
|
||||
groupByJob.setOutputValueClass(NullWritable.class);
|
||||
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
|
||||
groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
|
||||
if (!config.getSegmentGranularIntervals().isPresent()) {
|
||||
groupByJob.setNumReduceTasks(1);
|
||||
} else {
|
||||
|
@ -150,14 +152,6 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
);
|
||||
int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
|
||||
|
||||
if (numberOfShards > MAX_SHARDS) {
|
||||
throw new ISE(
|
||||
"Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high",
|
||||
numberOfShards,
|
||||
MAX_SHARDS
|
||||
);
|
||||
}
|
||||
|
||||
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
|
||||
if (numberOfShards == 1) {
|
||||
actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
|
||||
|
@ -314,13 +308,6 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
}
|
||||
}
|
||||
|
||||
private byte[] getDataBytes(BytesWritable writable)
|
||||
{
|
||||
byte[] rv = new byte[writable.getLength()];
|
||||
System.arraycopy(writable.getBytes(), 0, rv, 0, writable.getLength());
|
||||
return rv;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(Context context)
|
||||
throws IOException, InterruptedException
|
||||
|
@ -348,6 +335,50 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class DetermineHashedPartitionsPartitioner
|
||||
extends Partitioner<LongWritable, BytesWritable> implements Configurable
|
||||
{
|
||||
private Configuration config;
|
||||
private boolean determineIntervals;
|
||||
private Map<LongWritable, Integer> reducerLookup;
|
||||
|
||||
@Override
|
||||
public int getPartition(LongWritable interval, BytesWritable text, int numPartitions)
|
||||
{
|
||||
|
||||
if (config.get("mapred.job.tracker").equals("local") || determineIntervals) {
|
||||
return 1;
|
||||
} else {
|
||||
return reducerLookup.get(interval);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf()
|
||||
{
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration config)
|
||||
{
|
||||
this.config = config;
|
||||
HadoopDruidIndexerConfig hadoopConfig = HadoopDruidIndexerConfigBuilder.fromConfiguration(config);
|
||||
if (hadoopConfig.getSegmentGranularIntervals().isPresent()) {
|
||||
determineIntervals = false;
|
||||
int reducerNumber = 0;
|
||||
ImmutableMap.Builder<LongWritable, Integer> builder = ImmutableMap.builder();
|
||||
for (Interval interval : hadoopConfig.getSegmentGranularIntervals().get()) {
|
||||
builder.put(new LongWritable(interval.getStartMillis()), reducerNumber++);
|
||||
}
|
||||
reducerLookup = builder.build();
|
||||
} else {
|
||||
determineIntervals = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
|
|||
if (config.isDeterminingPartitions()) {
|
||||
jobs.add(config.getPartitionsSpec().getPartitionJob(config));
|
||||
} else {
|
||||
int shardsPerInterval = config.getPartitionsSpec().getShardCount();
|
||||
int shardsPerInterval = config.getPartitionsSpec().getNumShards();
|
||||
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
|
||||
int shardCount = 0;
|
||||
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
|
||||
|
|
|
@ -26,17 +26,16 @@ import com.google.common.base.Preconditions;
|
|||
public abstract class AbstractPartitionsSpec implements PartitionsSpec
|
||||
{
|
||||
private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
|
||||
private static final int MAX_SHARDS = 128;
|
||||
private final long targetPartitionSize;
|
||||
private final long maxPartitionSize;
|
||||
private final boolean assumeGrouped;
|
||||
private final int shardCount;
|
||||
private final int numShards;
|
||||
|
||||
public AbstractPartitionsSpec(
|
||||
Long targetPartitionSize,
|
||||
Long maxPartitionSize,
|
||||
Boolean assumeGrouped,
|
||||
Integer shardCount
|
||||
Integer numShards
|
||||
)
|
||||
{
|
||||
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
|
||||
|
@ -44,15 +43,11 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec
|
|||
? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
|
||||
: maxPartitionSize;
|
||||
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
|
||||
this.shardCount = shardCount == null ? -1 : shardCount;
|
||||
this.numShards = numShards == null ? -1 : numShards;
|
||||
Preconditions.checkArgument(
|
||||
this.targetPartitionSize == -1 || this.shardCount == -1,
|
||||
this.targetPartitionSize == -1 || this.numShards == -1,
|
||||
"targetPartitionsSize and shardCount both cannot be set"
|
||||
);
|
||||
Preconditions.checkArgument(
|
||||
this.shardCount < MAX_SHARDS,
|
||||
"shardCount cannot be more than MAX_SHARD_COUNT[%d] ", MAX_SHARDS
|
||||
);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -80,8 +75,8 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec
|
|||
}
|
||||
|
||||
@Override
|
||||
public int getShardCount()
|
||||
public int getNumShards()
|
||||
{
|
||||
return shardCount;
|
||||
return numShards;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,10 +34,10 @@ public class HashedPartitionsSpec extends AbstractPartitionsSpec
|
|||
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
|
||||
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
|
||||
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
|
||||
@JsonProperty("shardCount") @Nullable Integer shardCount
|
||||
@JsonProperty("numShards") @Nullable Integer numShards
|
||||
)
|
||||
{
|
||||
super(targetPartitionSize, maxPartitionSize, assumeGrouped, shardCount);
|
||||
super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -50,6 +50,6 @@ public interface PartitionsSpec
|
|||
public boolean isDeterminingPartitions();
|
||||
|
||||
@JsonProperty
|
||||
public int getShardCount();
|
||||
public int getNumShards();
|
||||
|
||||
}
|
||||
|
|
|
@ -33,9 +33,9 @@ public class RandomPartitionsSpec extends HashedPartitionsSpec
|
|||
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
|
||||
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
|
||||
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
|
||||
@JsonProperty("shardCount") @Nullable Integer shardCount
|
||||
@JsonProperty("numShards") @Nullable Integer numShards
|
||||
)
|
||||
{
|
||||
super(targetPartitionSize, maxPartitionSize, assumeGrouped, shardCount);
|
||||
super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -602,7 +602,7 @@ public class HadoopDruidIndexerConfigTest
|
|||
"{"
|
||||
+ "\"partitionsSpec\":{"
|
||||
+ " \"type\":\"hashed\","
|
||||
+ " \"shardCount\":2"
|
||||
+ " \"numShards\":2"
|
||||
+ " }"
|
||||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
|
@ -634,7 +634,7 @@ public class HadoopDruidIndexerConfigTest
|
|||
|
||||
Assert.assertEquals(
|
||||
"shardCount",
|
||||
partitionsSpec.getShardCount(),
|
||||
partitionsSpec.getNumShards(),
|
||||
2
|
||||
);
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.metamx.common.logger.Logger;
|
|||
import io.druid.db.DbConnector;
|
||||
import io.druid.db.DbTablesConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.joda.time.DateTime;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
|
@ -104,7 +105,7 @@ public class DbSegmentPublisher implements SegmentPublisher
|
|||
.bind("created_date", new DateTime().toString())
|
||||
.bind("start", segment.getInterval().getStart().toString())
|
||||
.bind("end", segment.getInterval().getEnd().toString())
|
||||
.bind("partitioned", segment.getShardSpec().getPartitionNum())
|
||||
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1)
|
||||
.bind("version", segment.getVersion())
|
||||
.bind("used", true)
|
||||
.bind("payload", jsonMapper.writeValueAsString(segment))
|
||||
|
|
Loading…
Reference in New Issue