Merge pull request #500 from metamx/batch-ingestion-fixes

Batch ingestion fixes
This commit is contained in:
fjy 2014-04-22 17:59:24 -06:00
commit 2d1f33e59f
11 changed files with 190 additions and 81 deletions

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector; import io.druid.db.DbConnector;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
@ -80,7 +81,7 @@ public class DbUpdaterJob implements Jobby
.put("created_date", new DateTime().toString()) .put("created_date", new DateTime().toString())
.put("start", segment.getInterval().getStart().toString()) .put("start", segment.getInterval().getStart().toString())
.put("end", segment.getInterval().getEnd().toString()) .put("end", segment.getInterval().getEnd().toString())
.put("partitioned", segment.getShardSpec().getPartitionNum()) .put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1)
.put("version", segment.getVersion()) .put("version", segment.getVersion())
.put("used", true) .put("used", true)
.put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment)) .put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment))

View File

@ -37,6 +37,7 @@ import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -45,6 +46,7 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@ -65,7 +67,6 @@ import java.util.Set;
*/ */
public class DetermineHashedPartitionsJob implements Jobby public class DetermineHashedPartitionsJob implements Jobby
{ {
private static final int MAX_SHARDS = 128;
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class); private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
private final HadoopDruidIndexerConfig config; private final HadoopDruidIndexerConfig config;
@ -98,8 +99,11 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputKeyClass(NullWritable.class); groupByJob.setOutputKeyClass(NullWritable.class);
groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
if (!config.getSegmentGranularIntervals().isPresent()) { if (!config.getSegmentGranularIntervals().isPresent()) {
groupByJob.setNumReduceTasks(1); groupByJob.setNumReduceTasks(1);
} else {
groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size());
} }
JobHelper.setupClasspath(config, groupByJob); JobHelper.setupClasspath(config, groupByJob);
@ -124,9 +128,6 @@ public class DetermineHashedPartitionsJob implements Jobby
if (!config.getSegmentGranularIntervals().isPresent()) { if (!config.getSegmentGranularIntervals().isPresent()) {
final Path intervalInfoPath = config.makeIntervalInfoPath(); final Path intervalInfoPath = config.makeIntervalInfoPath();
fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
if (!fileSystem.exists(intervalInfoPath)) {
throw new ISE("Path[%s] didn't exist!?", intervalInfoPath);
}
List<Interval> intervals = config.jsonMapper.readValue( List<Interval> intervals = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference<List<Interval>>() Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference<List<Interval>>()
{ {
@ -144,21 +145,12 @@ public class DetermineHashedPartitionsJob implements Jobby
if (fileSystem == null) { if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration()); fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration());
} }
if (fileSystem.exists(partitionInfoPath)) { final Long cardinality = config.jsonMapper.readValue(
Long cardinality = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>() Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
{ {
} }
); );
int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize()); final int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
if (numberOfShards > MAX_SHARDS) {
throw new ISE(
"Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high",
numberOfShards,
MAX_SHARDS
);
}
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards); List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
if (numberOfShards == 1) { if (numberOfShards == 1) {
@ -172,9 +164,6 @@ public class DetermineHashedPartitionsJob implements Jobby
shardSpecs.put(bucket, actualSpecs); shardSpecs.put(bucket, actualSpecs);
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
} }
config.setShardSpecs(shardSpecs); config.setShardSpecs(shardSpecs);
log.info( log.info(
@ -319,13 +308,6 @@ public class DetermineHashedPartitionsJob implements Jobby
} }
} }
private byte[] getDataBytes(BytesWritable writable)
{
byte[] rv = new byte[writable.getLength()];
System.arraycopy(writable.getBytes(), 0, rv, 0, writable.getLength());
return rv;
}
@Override @Override
public void run(Context context) public void run(Context context)
throws IOException, InterruptedException throws IOException, InterruptedException
@ -353,6 +335,50 @@ public class DetermineHashedPartitionsJob implements Jobby
} }
} }
} }
public static class DetermineHashedPartitionsPartitioner
extends Partitioner<LongWritable, BytesWritable> implements Configurable
{
private Configuration config;
private boolean determineIntervals;
private Map<LongWritable, Integer> reducerLookup;
@Override
public int getPartition(LongWritable interval, BytesWritable text, int numPartitions)
{
if (config.get("mapred.job.tracker").equals("local") || determineIntervals) {
return 0;
} else {
return reducerLookup.get(interval);
}
}
@Override
public Configuration getConf()
{
return config;
}
@Override
public void setConf(Configuration config)
{
this.config = config;
HadoopDruidIndexerConfig hadoopConfig = HadoopDruidIndexerConfigBuilder.fromConfiguration(config);
if (hadoopConfig.getSegmentGranularIntervals().isPresent()) {
determineIntervals = false;
int reducerNumber = 0;
ImmutableMap.Builder<LongWritable, Integer> builder = ImmutableMap.builder();
for (Interval interval : hadoopConfig.getSegmentGranularIntervals().get()) {
builder.put(new LongWritable(interval.getStartMillis()), reducerNumber++);
}
reducerLookup = builder.build();
} else {
determineIntervals = true;
}
}
}
} }

View File

@ -215,7 +215,6 @@ public class DeterminePartitionsJob implements Jobby
if (fileSystem == null) { if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
} }
if (fileSystem.exists(partitionInfoPath)) {
List<ShardSpec> specs = config.jsonMapper.readValue( List<ShardSpec> specs = config.jsonMapper.readValue(
Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>() Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
{ {
@ -229,9 +228,7 @@ public class DeterminePartitionsJob implements Jobby
} }
shardSpecs.put(segmentGranularity.getStart(), actualSpecs); shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
} }
config.setShardSpecs(shardSpecs); config.setShardSpecs(shardSpecs);

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator; import org.joda.time.DateTimeComparator;
@ -56,14 +57,29 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
if (config.isDeterminingPartitions()) { if (config.isDeterminingPartitions()) {
jobs.add(config.getPartitionsSpec().getPartitionJob(config)); jobs.add(config.getPartitionsSpec().getPartitionJob(config));
} else { } else {
int shardsPerInterval = config.getPartitionsSpec().getNumShards();
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0; int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart(); DateTime bucket = segmentGranularity.getStart();
if (shardsPerInterval > 0) {
List<HadoopyShardSpec> specs = Lists.newArrayListWithCapacity(shardsPerInterval);
for (int i = 0; i < shardsPerInterval; i++) {
specs.add(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(i, shardsPerInterval),
shardCount++
)
);
}
shardSpecs.put(bucket, specs);
log.info("DateTime[%s], spec[%s]", bucket, specs);
} else {
final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++); final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
shardSpecs.put(bucket, Lists.newArrayList(spec)); shardSpecs.put(bucket, Lists.newArrayList(spec));
log.info("DateTime[%s], spec[%s]", bucket, spec); log.info("DateTime[%s], spec[%s]", bucket, spec);
} }
}
config.setShardSpecs(shardSpecs); config.setShardSpecs(shardSpecs);
} }

View File

@ -20,6 +20,7 @@
package io.druid.indexer.partitions; package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
public abstract class AbstractPartitionsSpec implements PartitionsSpec public abstract class AbstractPartitionsSpec implements PartitionsSpec
@ -28,11 +29,13 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec
private final long targetPartitionSize; private final long targetPartitionSize;
private final long maxPartitionSize; private final long maxPartitionSize;
private final boolean assumeGrouped; private final boolean assumeGrouped;
private final int numShards;
public AbstractPartitionsSpec( public AbstractPartitionsSpec(
Long targetPartitionSize, Long targetPartitionSize,
Long maxPartitionSize, Long maxPartitionSize,
Boolean assumeGrouped Boolean assumeGrouped,
Integer numShards
) )
{ {
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize; this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
@ -40,6 +43,11 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec
? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD) ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
: maxPartitionSize; : maxPartitionSize;
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped; this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
this.numShards = numShards == null ? -1 : numShards;
Preconditions.checkArgument(
this.targetPartitionSize == -1 || this.numShards == -1,
"targetPartitionsSize and shardCount both cannot be set"
);
} }
@JsonProperty @JsonProperty
@ -65,4 +73,10 @@ public abstract class AbstractPartitionsSpec implements PartitionsSpec
{ {
return targetPartitionSize > 0; return targetPartitionSize > 0;
} }
@Override
public int getNumShards()
{
return numShards;
}
} }

View File

@ -33,10 +33,11 @@ public class HashedPartitionsSpec extends AbstractPartitionsSpec
public HashedPartitionsSpec( public HashedPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
@JsonProperty("numShards") @Nullable Integer numShards
) )
{ {
super(targetPartitionSize, maxPartitionSize, assumeGrouped); super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
} }
@Override @Override

View File

@ -49,4 +49,7 @@ public interface PartitionsSpec
@JsonIgnore @JsonIgnore
public boolean isDeterminingPartitions(); public boolean isDeterminingPartitions();
@JsonProperty
public int getNumShards();
} }

View File

@ -21,9 +21,6 @@ package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexer.DetermineHashedPartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -35,9 +32,10 @@ public class RandomPartitionsSpec extends HashedPartitionsSpec
public RandomPartitionsSpec( public RandomPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize, @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize, @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
@JsonProperty("numShards") @Nullable Integer numShards
) )
{ {
super(targetPartitionSize, maxPartitionSize, assumeGrouped); super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
} }
} }

View File

@ -41,7 +41,7 @@ public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
) )
{ {
super(targetPartitionSize, maxPartitionSize, assumeGrouped); super(targetPartitionSize, maxPartitionSize, assumeGrouped, null);
this.partitionDimension = partitionDimension; this.partitionDimension = partitionDimension;
} }

View File

@ -503,7 +503,8 @@ public class HadoopDruidIndexerConfigTest
} }
@Test @Test
public void testRandomPartitionsSpec() throws Exception{ public void testRandomPartitionsSpec() throws Exception
{
{ {
final HadoopDruidIndexerConfig cfg; final HadoopDruidIndexerConfig cfg;
@ -547,7 +548,8 @@ public class HadoopDruidIndexerConfigTest
} }
@Test @Test
public void testHashedPartitionsSpec() throws Exception{ public void testHashedPartitionsSpec() throws Exception
{
{ {
final HadoopDruidIndexerConfig cfg; final HadoopDruidIndexerConfig cfg;
@ -589,4 +591,54 @@ public class HadoopDruidIndexerConfigTest
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec); Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
} }
} }
@Test
public void testHashedPartitionsSpecShardCount() throws Exception
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"type\":\"hashed\","
+ " \"numShards\":2"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
false
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
-1
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
-1
);
Assert.assertEquals(
"shardCount",
partitionsSpec.getNumShards(),
2
);
Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
}
} }

View File

@ -25,6 +25,7 @@ import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector; import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig; import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
@ -104,7 +105,7 @@ public class DbSegmentPublisher implements SegmentPublisher
.bind("created_date", new DateTime().toString()) .bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString()) .bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString()) .bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", segment.getShardSpec().getPartitionNum()) .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1)
.bind("version", segment.getVersion()) .bind("version", segment.getVersion())
.bind("used", true) .bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment)) .bind("payload", jsonMapper.writeValueAsString(segment))