Revert "Merge branch 'determine-partitions-improvements'"

This reverts commit 7ad228ceb5, reversing
changes made to 9c55e2b779.
This commit is contained in:
fjy 2014-02-14 12:47:34 -08:00
parent 5607edb813
commit 189b3e2b9b
29 changed files with 380 additions and 1280 deletions

View File

@ -82,7 +82,6 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
"segmentOutputPath": "s3n:\/\/billy-bucket\/the\/segments\/go\/here", "segmentOutputPath": "s3n:\/\/billy-bucket\/the\/segments\/go\/here",
"leaveIntermediate": "false", "leaveIntermediate": "false",
"partitionsSpec": { "partitionsSpec": {
"type": "random"
"targetPartitionSize": 5000000 "targetPartitionSize": 5000000
}, },
"updaterJobSpec": { "updaterJobSpec": {
@ -146,20 +145,12 @@ The indexing process has the ability to roll data up as it processes the incomin
### Partitioning specification ### Partitioning specification
Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in some other way depending on partition type. Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in some other way. For example, data for a day may be split by the dimension "last\_name" into two segments: one with all values from A-M and one with all values from N-Z.
Druid supports two types of partitions spec - singleDimension and random.
In SingleDimension partition type data is partitioned based on the values in that dimension.
For example, data for a day may be split by the dimension "last\_name" into two segments: one with all values from A-M and one with all values from N-Z.
In random partition type, the number of partitions is determined based on the targetPartitionSize and cardinality of input set and the data is partitioned based on the hashcode of the row.
Random partition type is more efficient and gives better distribution of data.
To use this option, the indexer must be given a target partition size. It can then find a good set of partition ranges on its own. To use this option, the indexer must be given a target partition size. It can then find a good set of partition ranges on its own.
|property|description|required?| |property|description|required?|
|--------|-----------|---------| |--------|-----------|---------|
|type|type of partitionSpec to be used |no, default : singleDimension|
|targetPartitionSize|target number of rows to include in a partition, should be a number that targets segments of 700MB\~1GB.|yes| |targetPartitionSize|target number of rows to include in a partition, should be a number that targets segments of 700MB\~1GB.|yes|
|partitionDimension|the dimension to partition on. Leave blank to select a dimension automatically.|no| |partitionDimension|the dimension to partition on. Leave blank to select a dimension automatically.|no|
|assumeGrouped|assume input data has already been grouped on time and dimensions. This is faster, but can choose suboptimal partitions if the assumption is violated.|no| |assumeGrouped|assume input data has already been grouped on time and dimensions. This is faster, but can choose suboptimal partitions if the assumption is violated.|no|

View File

@ -97,11 +97,6 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.5.2</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -1,357 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.io.Closeables;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Determines appropriate ShardSpecs for a job by determining approximate cardinality of data set using HyperLogLog
*/
public class DetermineHashedPartitionsJob implements Jobby
{
private static final int MAX_SHARDS = 128;
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
private static final int HYPER_LOG_LOG_BIT_SIZE = 20;
private final HadoopDruidIndexerConfig config;
public DetermineHashedPartitionsJob(
HadoopDruidIndexerConfig config
)
{
this.config = config;
}
public boolean run()
{
try {
/*
* Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear
* in the final segment.
*/
long startTime = System.currentTimeMillis();
final Job groupByJob = new Job(
new Configuration(),
String.format("%s-determine_partitions_hashed-%s", config.getDataSource(), config.getIntervals())
);
JobHelper.injectSystemProperties(groupByJob);
groupByJob.setInputFormatClass(TextInputFormat.class);
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
groupByJob.setMapOutputKeyClass(LongWritable.class);
groupByJob.setMapOutputValueClass(BytesWritable.class);
groupByJob.setReducerClass(DetermineCardinalityReducer.class);
groupByJob.setOutputKeyClass(NullWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
groupByJob.setNumReduceTasks(1);
JobHelper.setupClasspath(config, groupByJob);
config.addInputPaths(groupByJob);
config.intoConfiguration(groupByJob);
FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir());
groupByJob.submit();
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
if (!groupByJob.waitForCompletion(true)) {
log.error("Job failed: %s", groupByJob.getJobID());
return false;
}
/*
* Load partitions and intervals determined by the previous job.
*/
log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals());
FileSystem fileSystem = null;
if (!config.getSegmentGranularIntervals().isPresent()) {
final Path intervalInfoPath = config.makeIntervalInfoPath();
fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
if (!fileSystem.exists(intervalInfoPath)) {
throw new ISE("Path[%s] didn't exist!?", intervalInfoPath);
}
List<Interval> intervals = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference<List<Interval>>()
{
}
);
config.setGranularitySpec(new UniformGranularitySpec(config.getGranularitySpec().getGranularity(), intervals));
log.info("Determined Intervals for Job [%s]" + config.getSegmentGranularIntervals());
}
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart();
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity);
if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration());
}
if (fileSystem.exists(partitionInfoPath)) {
Long cardinality = config.jsonMapper.readValue(
Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
{
}
);
int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
if (numberOfShards > MAX_SHARDS) {
throw new ISE(
"Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high",
numberOfShards,
MAX_SHARDS
);
}
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
if (numberOfShards == 1) {
actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
} else {
for (int i = 0; i < numberOfShards; ++i) {
actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
}
}
shardSpecs.put(bucket, actualSpecs);
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
}
config.setShardSpecs(shardSpecs);
log.info(
"DetermineHashedPartitionsJob took %d millis",
(System.currentTimeMillis() - startTime)
);
return true;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public static class DetermineCardinalityMapper extends HadoopDruidIndexerMapper<LongWritable, BytesWritable>
{
private static HashFunction hashFunction = Hashing.murmur3_128();
private QueryGranularity rollupGranularity = null;
private Map<Interval, HyperLogLog> hyperLogLogs;
private HadoopDruidIndexerConfig config;
private boolean determineIntervals;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
super.setup(context);
rollupGranularity = getConfig().getRollupSpec().getRollupGranularity();
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
Optional<Set<Interval>> intervals = config.getSegmentGranularIntervals();
if (intervals.isPresent()) {
determineIntervals = false;
final ImmutableMap.Builder<Interval, HyperLogLog> builder = ImmutableMap.builder();
for (final Interval bucketInterval : intervals.get()) {
builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
}
hyperLogLogs = builder.build();
} else {
determineIntervals = true;
hyperLogLogs = Maps.newHashMap();
}
}
@Override
protected void innerMap(
InputRow inputRow,
Text text,
Context context
) throws IOException, InterruptedException
{
final List<Object> groupKey = Rows.toGroupKey(
rollupGranularity.truncate(inputRow.getTimestampFromEpoch()),
inputRow
);
Interval interval;
if (determineIntervals) {
interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
if (!hyperLogLogs.containsKey(interval)) {
hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
}
} else {
final Optional<Interval> maybeInterval = config.getGranularitySpec()
.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()));
if (!maybeInterval.isPresent()) {
throw new ISE("WTF?! No bucket found for timestamp: %s", inputRow.getTimestampFromEpoch());
}
interval = maybeInterval.get();
}
hyperLogLogs.get(interval)
.offerHashed(
hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey))
.asLong()
);
}
@Override
public void run(Context context) throws IOException, InterruptedException
{
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
for (Map.Entry<Interval, HyperLogLog> entry : hyperLogLogs.entrySet()) {
context.write(
new LongWritable(entry.getKey().getStartMillis()),
new BytesWritable(entry.getValue().getBytes())
);
}
cleanup(context);
}
}
public static class DetermineCardinalityReducer
extends Reducer<LongWritable, BytesWritable, NullWritable, NullWritable>
{
private final List<Interval> intervals = Lists.newArrayList();
protected HadoopDruidIndexerConfig config = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
}
@Override
protected void reduce(
LongWritable key,
Iterable<BytesWritable> values,
Context context
) throws IOException, InterruptedException
{
HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE);
for (BytesWritable value : values) {
HyperLogLog logValue = HyperLogLog.Builder.build(value.getBytes());
try {
aggregate.addAll(logValue);
}
catch (CardinalityMergeException e) {
e.printStackTrace(); // TODO: check for better handling
}
}
Interval interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(key.get()));
intervals.add(interval);
final Path outPath = config.makeSegmentPartitionInfoPath(interval);
final OutputStream out = Utils.makePathAndOutputStream(
context, outPath, config.isOverwriteFiles()
);
try {
HadoopDruidIndexerConfig.jsonMapper.writerWithType(
new TypeReference<Long>()
{
}
).writeValue(
out,
aggregate.cardinality()
);
}
finally {
Closeables.close(out, false);
}
}
@Override
public void run(Context context)
throws IOException, InterruptedException
{
super.run(context);
if (!config.getSegmentGranularIntervals().isPresent()) {
final Path outPath = config.makeIntervalInfoPath();
final OutputStream out = Utils.makePathAndOutputStream(
context, outPath, config.isOverwriteFiles()
);
try {
HadoopDruidIndexerConfig.jsonMapper.writerWithType(
new TypeReference<List<Interval>>()
{
}
).writeValue(
out,
intervals
);
}
finally {
Closeables.close(out, false);
}
}
}
}
}

View File

@ -26,7 +26,9 @@ import com.google.common.base.Optional;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ComparisonChain; import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -38,9 +40,7 @@ import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.collections.CombiningIterable; import io.druid.collections.CombiningIterable;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec; import io.druid.timeline.partition.SingleDimensionShardSpec;
@ -76,6 +76,7 @@ import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so, * Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so,
@ -106,6 +107,16 @@ public class DeterminePartitionsJob implements Jobby
this.config = config; this.config = config;
} }
public static void injectSystemProperties(Job job)
{
final Configuration conf = job.getConfiguration();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
}
public boolean run() public boolean run()
{ {
try { try {
@ -114,17 +125,13 @@ public class DeterminePartitionsJob implements Jobby
* in the final segment. * in the final segment.
*/ */
if(!(config.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec)){
throw new ISE("DeterminePartitionsJob can only be run for SingleDimensionPartitionsSpec, partitionSpec found [%s]", config.getPartitionsSpec());
}
if (!config.getPartitionsSpec().isAssumeGrouped()) { if (!config.getPartitionsSpec().isAssumeGrouped()) {
final Job groupByJob = new Job( final Job groupByJob = new Job(
new Configuration(), new Configuration(),
String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals()) String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals())
); );
JobHelper.injectSystemProperties(groupByJob); injectSystemProperties(groupByJob);
groupByJob.setInputFormatClass(TextInputFormat.class); groupByJob.setInputFormatClass(TextInputFormat.class);
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class); groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
groupByJob.setMapOutputKeyClass(BytesWritable.class); groupByJob.setMapOutputKeyClass(BytesWritable.class);
@ -161,7 +168,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19"); dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19");
JobHelper.injectSystemProperties(dimSelectionJob); injectSystemProperties(dimSelectionJob);
if (!config.getPartitionsSpec().isAssumeGrouped()) { if (!config.getPartitionsSpec().isAssumeGrouped()) {
// Read grouped data from the groupByJob. // Read grouped data from the groupByJob.
@ -183,7 +190,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.setOutputValueClass(Text.class); dimSelectionJob.setOutputValueClass(Text.class);
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class); dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class);
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size()); dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().size());
JobHelper.setupClasspath(config, dimSelectionJob); JobHelper.setupClasspath(config, dimSelectionJob);
config.intoConfiguration(dimSelectionJob); config.intoConfiguration(dimSelectionJob);
@ -209,8 +216,10 @@ public class DeterminePartitionsJob implements Jobby
FileSystem fileSystem = null; FileSystem fileSystem = null;
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0; int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity); DateTime bucket = segmentGranularity.getStart();
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0));
if (fileSystem == null) { if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
} }
@ -224,10 +233,10 @@ public class DeterminePartitionsJob implements Jobby
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
for (int i = 0; i < specs.size(); ++i) { for (int i = 0; i < specs.size(); ++i) {
actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++)); actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i)); log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
} }
shardSpecs.put(segmentGranularity.getStart(), actualSpecs); shardSpecs.put(bucket, actualSpecs);
} else { } else {
log.info("Path[%s] didn't exist!?", partitionInfoPath); log.info("Path[%s] didn't exist!?", partitionInfoPath);
} }
@ -260,9 +269,17 @@ public class DeterminePartitionsJob implements Jobby
Context context Context context
) throws IOException, InterruptedException ) throws IOException, InterruptedException
{ {
final List<Object> groupKey = Rows.toGroupKey( // Create group key, there are probably more efficient ways of doing this
final Map<String, Set<String>> dims = Maps.newTreeMap();
for (final String dim : inputRow.getDimensions()) {
final Set<String> dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));
if (dimValues.size() > 0) {
dims.put(dim, dimValues);
}
}
final List<Object> groupKey = ImmutableList.of(
rollupGranularity.truncate(inputRow.getTimestampFromEpoch()), rollupGranularity.truncate(inputRow.getTimestampFromEpoch()),
inputRow dims
); );
context.write( context.write(
new BytesWritable(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)), new BytesWritable(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)),
@ -298,8 +315,8 @@ public class DeterminePartitionsJob implements Jobby
throws IOException, InterruptedException throws IOException, InterruptedException
{ {
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration()); final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec(); final String partitionDimension = config.getPartitionDimension();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension()); helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
} }
@Override @Override
@ -330,8 +347,8 @@ public class DeterminePartitionsJob implements Jobby
{ {
super.setup(context); super.setup(context);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration()); final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
final SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec(); final String partitionDimension = config.getPartitionDimension();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension()); helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
} }
@Override @Override
@ -366,7 +383,7 @@ public class DeterminePartitionsJob implements Jobby
final ImmutableMap.Builder<DateTime, Integer> timeIndexBuilder = ImmutableMap.builder(); final ImmutableMap.Builder<DateTime, Integer> timeIndexBuilder = ImmutableMap.builder();
int idx = 0; int idx = 0;
for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals().get()) { for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals()) {
timeIndexBuilder.put(bucketInterval.getStart(), idx); timeIndexBuilder.put(bucketInterval.getStart(), idx);
idx++; idx++;
} }
@ -724,7 +741,7 @@ public class DeterminePartitionsJob implements Jobby
} }
final OutputStream out = Utils.makePathAndOutputStream( final OutputStream out = Utils.makePathAndOutputStream(
context, config.makeSegmentPartitionInfoPath(config.getGranularitySpec().getGranularity().bucket(bucket)), config.isOverwriteFiles() context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles()
); );
final DimPartitions chosenPartitions = maxCardinality > HIGH_CARDINALITY_THRESHOLD final DimPartitions chosenPartitions = maxCardinality > HIGH_CARDINALITY_THRESHOLD

View File

@ -1,74 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
/**
*/
public class HadoopDruidDetermineConfigurationJob implements Jobby
{
private static final Logger log = new Logger(HadoopDruidDetermineConfigurationJob.class);
private final HadoopDruidIndexerConfig config;
@Inject
public HadoopDruidDetermineConfigurationJob(
HadoopDruidIndexerConfig config
)
{
this.config = config;
}
@Override
public boolean run()
{
List<Jobby> jobs = Lists.newArrayList();
JobHelper.ensurePaths(config);
if (config.isDeterminingPartitions()) {
jobs.add(config.getPartitionsSpec().getPartitionJob(config));
} else {
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart();
final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
shardSpecs.put(bucket, Lists.newArrayList(spec));
log.info("DateTime[%s], spec[%s]", bucket, spec);
}
config.setShardSpecs(shardSpecs);
}
return JobHelper.runJobs(jobs, config);
}
}

View File

@ -50,7 +50,6 @@ import io.druid.guice.annotations.Self;
import io.druid.indexer.granularity.GranularitySpec; import io.druid.indexer.granularity.GranularitySpec;
import io.druid.indexer.granularity.UniformGranularitySpec; import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.path.PathSpec; import io.druid.indexer.path.PathSpec;
import io.druid.indexer.rollup.DataRollupSpec; import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec; import io.druid.indexer.updater.DbUpdaterJobSpec;
@ -74,7 +73,6 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedSet;
/** /**
*/ */
@ -180,7 +178,7 @@ public class HadoopDruidIndexerConfig
this.partitionsSpec = partitionsSpec; this.partitionsSpec = partitionsSpec;
} else { } else {
// Backwards compatibility // Backwards compatibility
this.partitionsSpec = new SingleDimensionPartitionsSpec(partitionDimension, targetPartitionSize, null, false); this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, null, false);
} }
if (granularitySpec != null) { if (granularitySpec != null) {
@ -380,17 +378,17 @@ public class HadoopDruidIndexerConfig
this.ignoreInvalidRows = ignoreInvalidRows; this.ignoreInvalidRows = ignoreInvalidRows;
} }
public Optional<List<Interval>> getIntervals() public List<Interval> getIntervals()
{ {
Optional<SortedSet<Interval>> setOptional = getGranularitySpec().bucketIntervals(); return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals());
if (setOptional.isPresent()) {
return Optional.of((List<Interval>) JodaUtils.condenseIntervals(setOptional.get()));
} else {
return Optional.absent();
}
} }
public boolean isDeterminingPartitions() public String getPartitionDimension()
{
return partitionsSpec.getPartitionDimension();
}
public boolean partitionByDimension()
{ {
return partitionsSpec.isDeterminingPartitions(); return partitionsSpec.isDeterminingPartitions();
} }
@ -485,70 +483,64 @@ public class HadoopDruidIndexerConfig
throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards); throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards);
} }
public Optional<Set<Interval>> getSegmentGranularIntervals() public Set<Interval> getSegmentGranularIntervals()
{ {
return Optional.fromNullable((Set<Interval>) granularitySpec.bucketIntervals().orNull()); return granularitySpec.bucketIntervals();
} }
public Optional<Iterable<Bucket>> getAllBuckets() public Iterable<Bucket> getAllBuckets()
{ {
Optional<Set<Interval>> intervals = getSegmentGranularIntervals(); return FunctionalIterable
if (intervals.isPresent()) { .create(getSegmentGranularIntervals())
return Optional.of( .transformCat(
(Iterable<Bucket>) FunctionalIterable new Function<Interval, Iterable<Bucket>>()
.create(intervals.get()) {
.transformCat( @Override
new Function<Interval, Iterable<Bucket>>() public Iterable<Bucket> apply(Interval input)
{ {
@Override final DateTime bucketTime = input.getStart();
public Iterable<Bucket> apply(Interval input) final List<HadoopyShardSpec> specs = shardSpecs.get(bucketTime);
{ if (specs == null) {
final DateTime bucketTime = input.getStart(); return ImmutableList.of();
final List<HadoopyShardSpec> specs = shardSpecs.get(bucketTime); }
if (specs == null) {
return ImmutableList.of();
}
return FunctionalIterable return FunctionalIterable
.create(specs) .create(specs)
.transform( .transform(
new Function<HadoopyShardSpec, Bucket>() new Function<HadoopyShardSpec, Bucket>()
{ {
int i = 0; int i = 0;
@Override @Override
public Bucket apply(HadoopyShardSpec input) public Bucket apply(HadoopyShardSpec input)
{ {
return new Bucket(input.getShardNum(), bucketTime, i++); return new Bucket(input.getShardNum(), bucketTime, i++);
} }
} }
); );
} }
} }
) );
);
} else {
return Optional.absent();
}
} }
/****************************************** /******************************************
Path helper logic Path helper logic
******************************************/ ******************************************/
/**
* Make the intermediate path for this job run.
*
* @return the intermediate path for this job run.
*/
/**
* Make the intermediate path for this job run.
*
* @return the intermediate path for this job run.
*/
public Path makeIntermediatePath() public Path makeIntermediatePath()
{ {
return new Path(String.format("%s/%s/%s", getWorkingPath(), getDataSource(), getVersion().replace(":", ""))); return new Path(String.format("%s/%s/%s", getWorkingPath(), getDataSource(), getVersion().replace(":", "")));
} }
public Path makeSegmentPartitionInfoPath(Interval bucketInterval) public Path makeSegmentPartitionInfoPath(Bucket bucket)
{ {
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
return new Path( return new Path(
String.format( String.format(
"%s/%s_%s/partitions.json", "%s/%s_%s/partitions.json",
@ -559,16 +551,6 @@ public class HadoopDruidIndexerConfig
); );
} }
public Path makeIntervalInfoPath()
{
return new Path(
String.format(
"%s/intervals.json",
makeIntermediatePath()
)
);
}
public Path makeDescriptorInfoDir() public Path makeDescriptorInfoDir()
{ {
return new Path(makeIntermediatePath(), "segmentDescriptorInfo"); return new Path(makeIntermediatePath(), "segmentDescriptorInfo");
@ -643,5 +625,8 @@ public class HadoopDruidIndexerConfig
Preconditions.checkNotNull(segmentOutputPath, "segmentOutputPath"); Preconditions.checkNotNull(segmentOutputPath, "segmentOutputPath");
Preconditions.checkNotNull(version, "version"); Preconditions.checkNotNull(version, "version");
Preconditions.checkNotNull(rollupSpec, "rollupSpec"); Preconditions.checkNotNull(rollupSpec, "rollupSpec");
final int nIntervals = getIntervals().size();
Preconditions.checkArgument(nIntervals > 0, "intervals.size()[%s] <= 0", nIntervals);
} }
} }

View File

@ -46,10 +46,6 @@ public class HadoopDruidIndexerConfigBuilder
return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class); return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class);
} }
public static HadoopDruidIndexerSchema toSchema(HadoopDruidIndexerConfig config){
return HadoopDruidIndexerConfig.jsonMapper.convertValue(config, HadoopDruidIndexerSchema.class);
}
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec) public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{ {
return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class); return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);

View File

@ -19,20 +19,34 @@
package io.druid.indexer; package io.druid.indexer;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
*/ */
public class HadoopDruidIndexerJob implements Jobby public class HadoopDruidIndexerJob implements Jobby
{ {
private static final Logger log = new Logger(HadoopDruidIndexerJob.class); private static final Logger log = new Logger(HadoopDruidIndexerJob.class);
private final HadoopDruidIndexerConfig config; private final HadoopDruidIndexerConfig config;
private final DbUpdaterJob dbUpdaterJob; private final DbUpdaterJob dbUpdaterJob;
private IndexGeneratorJob indexJob; private IndexGeneratorJob indexJob;
private volatile List<DataSegment> publishedSegments = null; private volatile List<DataSegment> publishedSegments = null;
@ -55,7 +69,23 @@ public class HadoopDruidIndexerJob implements Jobby
public boolean run() public boolean run()
{ {
List<Jobby> jobs = Lists.newArrayList(); List<Jobby> jobs = Lists.newArrayList();
JobHelper.ensurePaths(config);
ensurePaths();
if (config.partitionByDimension()) {
jobs.add(new DeterminePartitionsJob(config));
}
else {
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
DateTime bucket = segmentGranularity.getStart();
final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
shardSpecs.put(bucket, Lists.newArrayList(spec));
log.info("DateTime[%s], spec[%s]", bucket, spec);
}
config.setShardSpecs(shardSpecs);
}
indexJob = new IndexGeneratorJob(config); indexJob = new IndexGeneratorJob(config);
jobs.add(indexJob); jobs.add(indexJob);
@ -66,15 +96,65 @@ public class HadoopDruidIndexerJob implements Jobby
log.info("No updaterJobSpec set, not uploading to database"); log.info("No updaterJobSpec set, not uploading to database");
} }
String failedMessage = null;
for (Jobby job : jobs) {
if (failedMessage == null) {
if (!job.run()) {
failedMessage = String.format("Job[%s] failed!", job.getClass());
}
}
}
if (failedMessage == null) {
publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
}
if (!config.isLeaveIntermediate()) {
if (failedMessage == null || config.isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath);
try {
workingPath.getFileSystem(new Configuration()).delete(workingPath, true);
}
catch (IOException e) {
log.error(e, "Failed to cleanup path[%s]", workingPath);
}
}
}
if (failedMessage != null) {
throw new ISE(failedMessage);
}
JobHelper.runJobs(jobs, config);
publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
return true; return true;
} }
public List<DataSegment> getPublishedSegments() private void ensurePaths()
{ {
if (publishedSegments == null) { // config.addInputPaths() can have side-effects ( boo! :( ), so this stuff needs to be done before anything else
try {
Job job = new Job(
new Configuration(),
String.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals())
);
job.getConfiguration().set("io.sort.record.percent", "0.19");
for (String propName : System.getProperties().stringPropertyNames()) {
Configuration conf = job.getConfiguration();
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
config.addInputPaths(job);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public List<DataSegment> getPublishedSegments() {
if(publishedSegments == null) {
throw new IllegalStateException("Job hasn't run yet. No segments have been published yet."); throw new IllegalStateException("Job hasn't run yet. No segments have been published yet.");
} }
return publishedSegments; return publishedSegments;

View File

@ -22,7 +22,6 @@ package io.druid.indexer;
import com.metamx.common.RE; import com.metamx.common.RE;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
import io.druid.indexer.granularity.GranularitySpec;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
@ -71,9 +70,8 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
throw e; throw e;
} }
} }
GranularitySpec spec = config.getGranularitySpec();
if (!spec.bucketIntervals().isPresent() || spec.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())) if(config.getGranularitySpec().bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())).isPresent()) {
.isPresent()) {
innerMap(inputRow, value, context); innerMap(inputRow, value, context);
} }
} }

View File

@ -44,7 +44,6 @@ import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -85,7 +84,9 @@ import java.util.zip.ZipOutputStream;
public class IndexGeneratorJob implements Jobby public class IndexGeneratorJob implements Jobby
{ {
private static final Logger log = new Logger(IndexGeneratorJob.class); private static final Logger log = new Logger(IndexGeneratorJob.class);
private final HadoopDruidIndexerConfig config; private final HadoopDruidIndexerConfig config;
private IndexGeneratorStats jobStats; private IndexGeneratorStats jobStats;
public IndexGeneratorJob( public IndexGeneratorJob(
@ -96,6 +97,65 @@ public class IndexGeneratorJob implements Jobby
this.jobStats = new IndexGeneratorStats(); this.jobStats = new IndexGeneratorStats();
} }
public IndexGeneratorStats getJobStats()
{
return jobStats;
}
public boolean run()
{
try {
Job job = new Job(
new Configuration(),
String.format("%s-index-generator-%s", config.getDataSource(), config.getIntervals())
);
job.getConfiguration().set("io.sort.record.percent", "0.23");
for (String propName : System.getProperties().stringPropertyNames()) {
Configuration conf = job.getConfiguration();
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(Text.class);
SortableBytes.useSortableBytesAsMapOutputKey(job);
job.setNumReduceTasks(Iterables.size(config.getAllBuckets()));
job.setPartitionerClass(IndexGeneratorPartitioner.class);
job.setReducerClass(IndexGeneratorReducer.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
config.addInputPaths(job);
config.intoConfiguration(job);
JobHelper.setupClasspath(config, job);
job.submit();
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
boolean success = job.waitForCompletion(true);
Counter invalidRowCount = job.getCounters()
.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
jobStats.setInvalidRowCount(invalidRowCount.getValue());
return success;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config) public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{ {
@ -129,60 +189,6 @@ public class IndexGeneratorJob implements Jobby
return publishedSegments; return publishedSegments;
} }
public IndexGeneratorStats getJobStats()
{
return jobStats;
}
public boolean run()
{
try {
Job job = new Job(
new Configuration(),
String.format("%s-index-generator-%s", config.getDataSource(), config.getIntervals())
);
job.getConfiguration().set("io.sort.record.percent", "0.23");
JobHelper.injectSystemProperties(job);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(Text.class);
SortableBytes.useSortableBytesAsMapOutputKey(job);
job.setNumReduceTasks(Iterables.size(config.getAllBuckets().get()));
job.setPartitionerClass(IndexGeneratorPartitioner.class);
job.setReducerClass(IndexGeneratorReducer.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
config.addInputPaths(job);
config.intoConfiguration(job);
JobHelper.setupClasspath(config, job);
job.submit();
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
boolean success = job.waitForCompletion(true);
Counter invalidRowCount = job.getCounters()
.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
jobStats.setInvalidRowCount(invalidRowCount.getValue());
return success;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text> public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
{ {
@ -210,9 +216,8 @@ public class IndexGeneratorJob implements Jobby
} }
} }
public static class IndexGeneratorPartitioner extends Partitioner<BytesWritable, Text> implements Configurable public static class IndexGeneratorPartitioner extends Partitioner<BytesWritable, Text>
{ {
private Configuration config;
@Override @Override
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions) public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
@ -220,27 +225,12 @@ public class IndexGeneratorJob implements Jobby
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
bytes.position(4); // Skip length added by SortableBytes bytes.position(4); // Skip length added by SortableBytes
int shardNum = bytes.getInt(); int shardNum = bytes.getInt();
if (config.get("mapred.job.tracker").equals("local")) {
return shardNum % numPartitions;
} else {
if (shardNum >= numPartitions) {
throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions);
}
return shardNum;
if (shardNum >= numPartitions) {
throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions);
} }
}
@Override return shardNum;
public Configuration getConf()
{
return config;
}
@Override
public void setConf(Configuration config)
{
this.config = config;
} }
} }

View File

@ -20,11 +20,9 @@
package io.druid.indexer; package io.druid.indexer;
import com.google.api.client.util.Sets; import com.google.api.client.util.Sets;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.common.io.OutputSupplier; import com.google.common.io.OutputSupplier;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.filecache.DistributedCache;
@ -36,7 +34,6 @@ import org.apache.hadoop.mapreduce.Job;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
@ -97,63 +94,4 @@ public class JobHelper
} }
} }
} }
public static void injectSystemProperties(Job job)
{
final Configuration conf = job.getConfiguration();
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
}
}
}
public static void ensurePaths(HadoopDruidIndexerConfig config)
{
// config.addInputPaths() can have side-effects ( boo! :( ), so this stuff needs to be done before anything else
try {
Job job = new Job(
new Configuration(),
String.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals())
);
job.getConfiguration().set("io.sort.record.percent", "0.19");
injectSystemProperties(job);
config.addInputPaths(job);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config){
String failedMessage = null;
for (Jobby job : jobs) {
if (failedMessage == null) {
if (!job.run()) {
failedMessage = String.format("Job[%s] failed!", job.getClass());
}
}
}
if (!config.isLeaveIntermediate()) {
if (failedMessage == null || config.isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath);
try {
workingPath.getFileSystem(new Configuration()).delete(workingPath, true);
}
catch (IOException e) {
log.error(e, "Failed to cleanup path[%s]", workingPath);
}
}
}
if (failedMessage != null) {
throw new ISE(failedMessage);
}
return true;
}
} }

View File

@ -72,9 +72,9 @@ public class ArbitraryGranularitySpec implements GranularitySpec
@Override @Override
@JsonProperty("intervals") @JsonProperty("intervals")
public Optional<SortedSet<Interval>> bucketIntervals() public SortedSet<Interval> bucketIntervals()
{ {
return Optional.of((SortedSet<Interval>) intervals); return intervals;
} }
@Override @Override

View File

@ -40,7 +40,7 @@ import java.util.SortedSet;
public interface GranularitySpec public interface GranularitySpec
{ {
/** Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.*/ /** Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.*/
public Optional<SortedSet<Interval>> bucketIntervals(); public SortedSet<Interval> bucketIntervals();
/** Time-grouping interval corresponding to some instant, if any. */ /** Time-grouping interval corresponding to some instant, if any. */
public Optional<Interval> bucketInterval(DateTime dt); public Optional<Interval> bucketInterval(DateTime dt);

View File

@ -35,7 +35,7 @@ import java.util.SortedSet;
public class UniformGranularitySpec implements GranularitySpec public class UniformGranularitySpec implements GranularitySpec
{ {
final private Granularity granularity; final private Granularity granularity;
final private Iterable<Interval> inputIntervals; final private List<Interval> inputIntervals;
final private ArbitraryGranularitySpec wrappedSpec; final private ArbitraryGranularitySpec wrappedSpec;
@JsonCreator @JsonCreator
@ -44,28 +44,21 @@ public class UniformGranularitySpec implements GranularitySpec
@JsonProperty("intervals") List<Interval> inputIntervals @JsonProperty("intervals") List<Interval> inputIntervals
) )
{ {
this.granularity = granularity; List<Interval> granularIntervals = Lists.newArrayList();
if (inputIntervals != null) {
List<Interval> granularIntervals = Lists.newArrayList(); for (Interval inputInterval : inputIntervals) {
for (Interval inputInterval : inputIntervals) { Iterables.addAll(granularIntervals, granularity.getIterable(inputInterval));
Iterables.addAll(granularIntervals, granularity.getIterable(inputInterval));
}
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals);
} else {
this.inputIntervals = null;
this.wrappedSpec = null;
} }
this.granularity = granularity;
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals);
} }
@Override @Override
public Optional<SortedSet<Interval>> bucketIntervals() public SortedSet<Interval> bucketIntervals()
{ {
if (wrappedSpec == null) { return wrappedSpec.bucketIntervals();
return Optional.absent();
} else {
return wrappedSpec.bucketIntervals();
}
} }
@Override @Override
@ -82,8 +75,8 @@ public class UniformGranularitySpec implements GranularitySpec
} }
@JsonProperty("intervals") @JsonProperty("intervals")
public Optional<Iterable<Interval>> getIntervals() public Iterable<Interval> getIntervals()
{ {
return Optional.fromNullable(inputIntervals); return inputIntervals;
} }
} }

View File

@ -1,68 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonProperty;
public abstract class AbstractPartitionsSpec implements PartitionsSpec
{
private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
private final long targetPartitionSize;
private final long maxPartitionSize;
private final boolean assumeGrouped;
public AbstractPartitionsSpec(
Long targetPartitionSize,
Long maxPartitionSize,
Boolean assumeGrouped
)
{
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
this.maxPartitionSize = maxPartitionSize == null
? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
: maxPartitionSize;
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
}
@JsonProperty
public long getTargetPartitionSize()
{
return targetPartitionSize;
}
@JsonProperty
public long getMaxPartitionSize()
{
return maxPartitionSize;
}
@JsonProperty
public boolean isAssumeGrouped()
{
return assumeGrouped;
}
@Override
public boolean isDeterminingPartitions()
{
return targetPartitionSize > 0;
}
}

View File

@ -19,33 +19,69 @@
package io.druid.indexer.partitions; package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleDimensionPartitionsSpec.class) import javax.annotation.Nullable;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class), public class PartitionsSpec
@JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class)
})
public interface PartitionsSpec
{ {
@JsonIgnore private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
public Jobby getPartitionJob(HadoopDruidIndexerConfig config);
@JsonProperty @Nullable
public long getTargetPartitionSize(); private final String partitionDimension;
@JsonProperty private final long targetPartitionSize;
public long getMaxPartitionSize();
@JsonProperty private final long maxPartitionSize;
public boolean isAssumeGrouped();
private final boolean assumeGrouped;
@JsonCreator
public PartitionsSpec(
@JsonProperty("partitionDimension") @Nullable String partitionDimension,
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
this.partitionDimension = partitionDimension;
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
this.maxPartitionSize = maxPartitionSize == null
? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
: maxPartitionSize;
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
}
@JsonIgnore @JsonIgnore
public boolean isDeterminingPartitions(); public boolean isDeterminingPartitions()
{
return targetPartitionSize > 0;
}
@JsonProperty
@Nullable
public String getPartitionDimension()
{
return partitionDimension;
}
@JsonProperty
public long getTargetPartitionSize()
{
return targetPartitionSize;
}
@JsonProperty
public long getMaxPartitionSize()
{
return maxPartitionSize;
}
@JsonProperty
public boolean isAssumeGrouped()
{
return assumeGrouped;
}
} }

View File

@ -1,47 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexer.DetermineHashedPartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
public class RandomPartitionsSpec extends AbstractPartitionsSpec
{
@JsonCreator
public RandomPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
}
@Override
public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DetermineHashedPartitionsJob(config);
}
}

View File

@ -1,60 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexer.DeterminePartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec
{
@Nullable
private final String partitionDimension;
@JsonCreator
public SingleDimensionPartitionsSpec(
@JsonProperty("partitionDimension") @Nullable String partitionDimension,
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
this.partitionDimension = partitionDimension;
}
@JsonProperty
@Nullable
public String getPartitionDimension()
{
return partitionDimension;
}
@Override
public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DeterminePartitionsJob(config);
}
}

View File

@ -20,7 +20,6 @@
package io.druid.indexer.path; package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
@ -100,12 +99,9 @@ public class GranularityPathSpec implements PathSpec
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException
{ {
final Set<Interval> intervals = Sets.newTreeSet(Comparators.intervals()); final Set<Interval> intervals = Sets.newTreeSet(Comparators.intervals());
Optional<Set<Interval>> optionalIntervals = config.getSegmentGranularIntervals(); for (Interval segmentInterval : config.getSegmentGranularIntervals()) {
if (optionalIntervals.isPresent()) { for (Interval dataInterval : dataGranularity.getIterable(segmentInterval)) {
for (Interval segmentInterval : optionalIntervals.get()) { intervals.add(dataInterval);
for (Interval dataInterval : dataGranularity.getIterable(segmentInterval)) {
intervals.add(dataInterval);
}
} }
} }

View File

@ -25,8 +25,6 @@ import com.google.common.collect.Lists;
import io.druid.db.DbConnectorConfig; import io.druid.db.DbConnectorConfig;
import io.druid.indexer.granularity.UniformGranularitySpec; import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec; import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
@ -67,7 +65,7 @@ public class HadoopDruidIndexerConfigTest
Assert.assertEquals( Assert.assertEquals(
"getIntervals", "getIntervals",
Lists.newArrayList(new Interval("2012-01-01/P1D")), Lists.newArrayList(new Interval("2012-01-01/P1D")),
granularitySpec.getIntervals().get() granularitySpec.getIntervals()
); );
Assert.assertEquals( Assert.assertEquals(
@ -101,7 +99,7 @@ public class HadoopDruidIndexerConfigTest
Assert.assertEquals( Assert.assertEquals(
"getIntervals", "getIntervals",
Lists.newArrayList(new Interval("2012-02-01/P1D")), Lists.newArrayList(new Interval("2012-02-01/P1D")),
granularitySpec.getIntervals().get() granularitySpec.getIntervals()
); );
Assert.assertEquals( Assert.assertEquals(
@ -169,14 +167,15 @@ public class HadoopDruidIndexerConfigTest
100 100
); );
Assert.assertTrue( Assert.assertEquals(
"partitionSpec", "getPartitionDimension",
partitionsSpec instanceof SingleDimensionPartitionsSpec partitionsSpec.getPartitionDimension(),
null
); );
} }
@Test @Test
public void testPartitionsSpecSpecificDimensionLegacy() public void testPartitionsSpecSpecificDimension()
{ {
final HadoopDruidIndexerConfig cfg; final HadoopDruidIndexerConfig cfg;
@ -215,10 +214,9 @@ public class HadoopDruidIndexerConfigTest
150 150
); );
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals( Assert.assertEquals(
"getPartitionDimension", "getPartitionDimension",
((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(), partitionsSpec.getPartitionDimension(),
"foo" "foo"
); );
} }
@ -261,10 +259,9 @@ public class HadoopDruidIndexerConfigTest
150 150
); );
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals( Assert.assertEquals(
"getPartitionDimension", "getPartitionDimension",
((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(), partitionsSpec.getPartitionDimension(),
"foo" "foo"
); );
} }
@ -310,10 +307,9 @@ public class HadoopDruidIndexerConfigTest
200 200
); );
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals( Assert.assertEquals(
"getPartitionDimension", "getPartitionDimension",
((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(), partitionsSpec.getPartitionDimension(),
"foo" "foo"
); );
} }
@ -424,6 +420,7 @@ public class HadoopDruidIndexerConfigTest
); );
} }
@Test @Test
public void shouldMakeHDFSCompliantSegmentOutputPath() public void shouldMakeHDFSCompliantSegmentOutputPath()
{ {
@ -500,47 +497,4 @@ public class HadoopDruidIndexerConfigTest
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
public void testRandomPartitionsSpec() throws Exception{
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"type\":\"random\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec);
}
}
} }

View File

@ -54,7 +54,7 @@ public class ArbitraryGranularityTest
new Interval("2012-01-08T00Z/2012-01-11T00Z"), new Interval("2012-01-08T00Z/2012-01-11T00Z"),
new Interval("2012-02-01T00Z/2012-03-01T00Z") new Interval("2012-02-01T00Z/2012-03-01T00Z")
), ),
Lists.newArrayList(spec.bucketIntervals().get()) Lists.newArrayList(spec.bucketIntervals())
); );
Assert.assertEquals( Assert.assertEquals(

View File

@ -57,7 +57,7 @@ public class UniformGranularityTest
new Interval("2012-01-09T00Z/P1D"), new Interval("2012-01-09T00Z/P1D"),
new Interval("2012-01-10T00Z/P1D") new Interval("2012-01-10T00Z/P1D")
), ),
Lists.newArrayList(spec.bucketIntervals().get()) Lists.newArrayList(spec.bucketIntervals())
); );
Assert.assertEquals( Assert.assertEquals(

View File

@ -24,23 +24,25 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.util.Lists; import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigBuilder; import io.druid.indexer.HadoopDruidIndexerConfigBuilder;
import io.druid.indexer.HadoopDruidIndexerJob; import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopDruidIndexerSchema; import io.druid.indexer.HadoopDruidIndexerSchema;
import io.druid.indexer.Jobby;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.initialization.Initialization; import io.druid.initialization.Initialization;
import io.druid.server.initialization.ExtensionsConfig; import io.druid.server.initialization.ExtensionsConfig;
@ -49,26 +51,30 @@ import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.URL; import java.net.URL;
import java.net.URLClassLoader; import java.net.URLClassLoader;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.SortedSet; import java.util.Map;
public class HadoopIndexTask extends AbstractTask public class HadoopIndexTask extends AbstractFixedIntervalTask
{ {
private static final Logger log = new Logger(HadoopIndexTask.class); private static final Logger log = new Logger(HadoopIndexTask.class);
private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
private static final ExtensionsConfig extensionsConfig; private static final ExtensionsConfig extensionsConfig;
static { static {
extensionsConfig = Initialization.makeStartupInjector().getInstance(ExtensionsConfig.class); extensionsConfig = Initialization.makeStartupInjector().getInstance(ExtensionsConfig.class);
} }
private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
@JsonIgnore @JsonIgnore
private final HadoopDruidIndexerSchema schema; private final HadoopDruidIndexerSchema schema;
@JsonIgnore @JsonIgnore
private final String hadoopCoordinates; private final String hadoopCoordinates;
@ -91,7 +97,13 @@ public class HadoopIndexTask extends AbstractTask
{ {
super( super(
id != null ? id : String.format("index_hadoop_%s_%s", schema.getDataSource(), new DateTime()), id != null ? id : String.format("index_hadoop_%s_%s", schema.getDataSource(), new DateTime()),
schema.getDataSource() schema.getDataSource(),
JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
schema.getGranularitySpec()
.bucketIntervals()
)
)
); );
// Some HadoopDruidIndexerSchema stuff doesn't make sense in the context of the indexing service // Some HadoopDruidIndexerSchema stuff doesn't make sense in the context of the indexing service
@ -109,22 +121,6 @@ public class HadoopIndexTask extends AbstractTask
return "index_hadoop"; return "index_hadoop";
} }
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
Optional<SortedSet<Interval>> intervals = schema.getGranularitySpec().bucketIntervals();
if (intervals.isPresent()) {
Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
intervals.get()
)
);
return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent();
} else {
return true;
}
}
@JsonProperty("config") @JsonProperty("config")
public HadoopDruidIndexerSchema getSchema() public HadoopDruidIndexerSchema getSchema()
{ {
@ -171,60 +167,29 @@ public class HadoopIndexTask extends AbstractTask
jobUrls.addAll(extensionURLs); jobUrls.addAll(extensionURLs);
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls)); System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls));
boolean determineIntervals = !schema.getGranularitySpec().bucketIntervals().isPresent();
final Class<?> determineConfigurationMainClass = loader.loadClass(HadoopDetermineConfigInnerProcessing.class.getName()); final Class<?> mainClass = loader.loadClass(HadoopIndexTaskInnerProcessing.class.getName());
final Method determineConfigurationMainMethod = determineConfigurationMainClass.getMethod( final Method mainMethod = mainClass.getMethod("runTask", String[].class);
"runTask",
String[].class
);
String[] determineConfigArgs = new String[]{ // We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
log.info("Setting version to: %s", myLock.getVersion());
String[] args = new String[]{
toolbox.getObjectMapper().writeValueAsString(schema), toolbox.getObjectMapper().writeValueAsString(schema),
myLock.getVersion(),
toolbox.getConfig().getHadoopWorkingPath(), toolbox.getConfig().getHadoopWorkingPath(),
toolbox.getSegmentPusher().getPathForHadoop(getDataSource()) toolbox.getSegmentPusher().getPathForHadoop(getDataSource()),
}; };
String config = (String) determineConfigurationMainMethod.invoke(null, new Object[]{determineConfigArgs}); String segments = (String) mainMethod.invoke(null, new Object[]{args});
HadoopDruidIndexerSchema indexerSchema = toolbox.getObjectMapper()
.readValue(config, HadoopDruidIndexerSchema.class);
// We should have a lock from before we started running only if interval was specified
final String version;
if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
indexerSchema.getGranularitySpec().bucketIntervals().get()
)
);
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval));
version = lock.getVersion();
} else {
Iterable<TaskLock> locks = getTaskLocks(toolbox);
final TaskLock myLock = Iterables.getOnlyElement(locks);
version = myLock.getVersion();
}
log.info("Setting version to: %s", version);
final Class<?> indexGeneratorMainClass = loader.loadClass(HadoopIndexGeneratorInnerProcessing.class.getName());
final Method indexGeneratorMainMethod = indexGeneratorMainClass.getMethod("runTask", String[].class);
String[] indexGeneratorArgs = new String[]{
toolbox.getObjectMapper().writeValueAsString(indexerSchema),
version
};
String segments = (String) indexGeneratorMainMethod.invoke(null, new Object[]{indexGeneratorArgs});
if (segments != null) { if (segments != null) {
List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue( List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue(
segments, segments,
new TypeReference<List<DataSegment>>() new TypeReference<List<DataSegment>>() {}
{
}
); );
toolbox.pushSegments(publishedSegments); toolbox.pushSegments(publishedSegments);
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} else { } else {
@ -232,12 +197,14 @@ public class HadoopIndexTask extends AbstractTask
} }
} }
public static class HadoopIndexGeneratorInnerProcessing public static class HadoopIndexTaskInnerProcessing
{ {
public static String runTask(String[] args) throws Exception public static String runTask(String[] args) throws Exception
{ {
final String schema = args[0]; final String schema = args[0];
String version = args[1]; final String version = args[1];
final String workingPath = args[2];
final String segmentOutputPath = args[3];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue( .readValue(
@ -247,6 +214,12 @@ public class HadoopIndexTask extends AbstractTask
final HadoopDruidIndexerConfig config = final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema) new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withVersion(version) .withVersion(version)
.withWorkingPath(
workingPath
)
.withSegmentOutputPath(
segmentOutputPath
)
.build(); .build();
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config); HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config);
@ -259,34 +232,4 @@ public class HadoopIndexTask extends AbstractTask
return null; return null;
} }
} }
public static class HadoopDetermineConfigInnerProcessing
{
public static String runTask(String[] args) throws Exception
{
final String schema = args[0];
final String workingPath = args[1];
final String segmentOutputPath = args[2];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue(
schema,
HadoopDruidIndexerSchema.class
);
final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withWorkingPath(workingPath)
.withSegmentOutputPath(segmentOutputPath)
.build();
Jobby job = new HadoopDruidDetermineConfigurationJob(config);
log.info("Starting a hadoop index generator job...");
if (job.run()) {
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(HadoopDruidIndexerConfigBuilder.toSchema(config));
}
return null;
}
}
} }

View File

@ -109,8 +109,8 @@ public class IndexTask extends AbstractFixedIntervalTask
id != null ? id : String.format("index_%s_%s", dataSource, new DateTime().toString()), id != null ? id : String.format("index_%s_%s", dataSource, new DateTime().toString()),
dataSource, dataSource,
new Interval( new Interval(
granularitySpec.bucketIntervals().get().first().getStart(), granularitySpec.bucketIntervals().first().getStart(),
granularitySpec.bucketIntervals().get().last().getEnd() granularitySpec.bucketIntervals().last().getEnd()
) )
); );
@ -137,7 +137,7 @@ public class IndexTask extends AbstractFixedIntervalTask
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Set<DataSegment> segments = Sets.newHashSet(); final Set<DataSegment> segments = Sets.newHashSet();
final Set<Interval> validIntervals = Sets.intersection(granularitySpec.bucketIntervals().get(), getDataIntervals()); final Set<Interval> validIntervals = Sets.intersection(granularitySpec.bucketIntervals(), getDataIntervals());
if (validIntervals.isEmpty()) { if (validIntervals.isEmpty()) {
throw new ISE("No valid data intervals found. Check your configs!"); throw new ISE("No valid data intervals found. Check your configs!");
} }

View File

@ -382,9 +382,11 @@ public class TaskSerdeTest
final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class); final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource()); Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
} }
} }

View File

@ -30,7 +30,6 @@ import io.druid.guice.annotations.Self;
import io.druid.initialization.DruidModule; import io.druid.initialization.DruidModule;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec; import io.druid.timeline.partition.SingleDimensionShardSpec;
@ -64,8 +63,7 @@ public class ServerModule implements DruidModule
.registerSubtypes( .registerSubtypes(
new NamedType(SingleDimensionShardSpec.class, "single"), new NamedType(SingleDimensionShardSpec.class, "single"),
new NamedType(LinearShardSpec.class, "linear"), new NamedType(LinearShardSpec.class, "linear"),
new NamedType(NumberedShardSpec.class, "numbered"), new NamedType(NumberedShardSpec.class, "numbered")
new NamedType(HashBasedNumberedShardSpec.class, "hashed")
) )
); );
} }

View File

@ -1,77 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import java.util.List;
public class HashBasedNumberedShardSpec extends NumberedShardSpec
{
private static final HashFunction hashFunction = Hashing.murmur3_32();
@JacksonInject
private ObjectMapper jsonMapper;
@JsonCreator
public HashBasedNumberedShardSpec(
@JsonProperty("partitionNum") int partitionNum,
@JsonProperty("partitions") int partitions
)
{
super(partitionNum, partitions);
}
@Override
public boolean isInChunk(InputRow inputRow)
{
return Math.abs(hash(inputRow)) % getPartitions() == getPartitionNum();
}
private int hash(InputRow inputRow)
{
final List<Object> groupKey = Rows.toGroupKey(inputRow.getTimestampFromEpoch(), inputRow);
try {
return hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asInt();
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
}
@Override
public String toString()
{
return "HashBasedNumberedShardSpec{" +
"partitionNum=" + getPartitionNum() +
", partitions=" + getPartitions() +
'}';
}
}

View File

@ -19,12 +19,8 @@
package io.druid; package io.druid;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.ISE;
import io.druid.guice.ServerModule; import io.druid.guice.ServerModule;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
@ -41,20 +37,5 @@ public class TestUtil
for (Module module : list) { for (Module module : list) {
MAPPER.registerModule(module); MAPPER.registerModule(module);
} }
MAPPER.setInjectableValues(
new InjectableValues()
{
@Override
public Object findInjectableValue(
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
if (valueId.equals("com.fasterxml.jackson.databind.ObjectMapper")) {
return TestUtil.MAPPER;
}
throw new ISE("No Injectable value found");
}
}
);
} }
} }

View File

@ -1,110 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.shard;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import io.druid.TestUtil;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.ShardSpec;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class HashBasedNumberedShardSpecTest
{
@Test
public void testSerdeRoundTrip() throws Exception
{
final ShardSpec spec = TestUtil.MAPPER.readValue(
TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2)),
ShardSpec.class
);
Assert.assertEquals(1, spec.getPartitionNum());
Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getPartitions());
}
@Test
public void testSerdeBackwardsCompat() throws Exception
{
final ShardSpec spec = TestUtil.MAPPER.readValue(
"{\"type\": \"hashed\", \"partitions\": 2, \"partitionNum\": 1}",
ShardSpec.class
);
Assert.assertEquals(1, spec.getPartitionNum());
Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getPartitions());
}
@Test
public void testPartitionChunks()
{
final List<ShardSpec> specs = ImmutableList.<ShardSpec>of(
new HashBasedNumberedShardSpec(0, 3),
new HashBasedNumberedShardSpec(1, 3),
new HashBasedNumberedShardSpec(2, 3)
);
final List<PartitionChunk<String>> chunks = Lists.transform(
specs,
new Function<ShardSpec, PartitionChunk<String>>()
{
@Override
public PartitionChunk<String> apply(ShardSpec shardSpec)
{
return shardSpec.createChunk("rofl");
}
}
);
Assert.assertEquals(0, chunks.get(0).getChunkNumber());
Assert.assertEquals(1, chunks.get(1).getChunkNumber());
Assert.assertEquals(2, chunks.get(2).getChunkNumber());
Assert.assertTrue(chunks.get(0).isStart());
Assert.assertFalse(chunks.get(1).isStart());
Assert.assertFalse(chunks.get(2).isStart());
Assert.assertFalse(chunks.get(0).isEnd());
Assert.assertFalse(chunks.get(1).isEnd());
Assert.assertTrue(chunks.get(2).isEnd());
Assert.assertTrue(chunks.get(0).abuts(chunks.get(1)));
Assert.assertTrue(chunks.get(1).abuts(chunks.get(2)));
Assert.assertFalse(chunks.get(0).abuts(chunks.get(0)));
Assert.assertFalse(chunks.get(0).abuts(chunks.get(2)));
Assert.assertFalse(chunks.get(1).abuts(chunks.get(0)));
Assert.assertFalse(chunks.get(1).abuts(chunks.get(1)));
Assert.assertFalse(chunks.get(2).abuts(chunks.get(0)));
Assert.assertFalse(chunks.get(2).abuts(chunks.get(1)));
Assert.assertFalse(chunks.get(2).abuts(chunks.get(2)));
}
}