correct locking and partitionsSpec

This commit is contained in:
nishantmonu51 2014-02-05 03:17:47 +05:30
parent 569452121e
commit bacc72415f
23 changed files with 538 additions and 320 deletions

View File

@ -24,9 +24,7 @@ import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction; import com.google.common.hash.HashFunction;
@ -35,6 +33,7 @@ import com.google.common.io.Closeables;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.UniformGranularitySpec; import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.HashBasedNumberedShardSpec;
@ -68,8 +67,8 @@ public class DetermineHashedPartitionsJob implements Jobby
{ {
private static final int MAX_SHARDS = 128; private static final int MAX_SHARDS = 128;
private static final Logger log = new Logger(DetermineHashedPartitionsJob.class); private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
private final HadoopDruidIndexerConfig config;
private static final int HYPER_LOG_LOG_BIT_SIZE = 20; private static final int HYPER_LOG_LOG_BIT_SIZE = 20;
private final HadoopDruidIndexerConfig config;
public DetermineHashedPartitionsJob( public DetermineHashedPartitionsJob(
HadoopDruidIndexerConfig config HadoopDruidIndexerConfig config
@ -121,7 +120,7 @@ public class DetermineHashedPartitionsJob implements Jobby
log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals()); log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals());
FileSystem fileSystem = null; FileSystem fileSystem = null;
if (config.getSegmentGranularIntervals().isEmpty()) { if (!config.getSegmentGranularIntervals().isPresent()) {
final Path intervalInfoPath = config.makeIntervalInfoPath(); final Path intervalInfoPath = config.makeIntervalInfoPath();
fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
if (!fileSystem.exists(intervalInfoPath)) { if (!fileSystem.exists(intervalInfoPath)) {
@ -137,7 +136,7 @@ public class DetermineHashedPartitionsJob implements Jobby
} }
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0; int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart(); DateTime bucket = segmentGranularity.getStart();
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity); final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity);
@ -204,17 +203,17 @@ public class DetermineHashedPartitionsJob implements Jobby
super.setup(context); super.setup(context);
rollupGranularity = getConfig().getRollupSpec().getRollupGranularity(); rollupGranularity = getConfig().getRollupSpec().getRollupGranularity();
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration()); config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
Optional<Set<Interval>> intervals = config.getSegmentGranularIntervals();
if (config.getSegmentGranularIntervals().isEmpty()) { if (intervals.isPresent()) {
determineIntervals = true;
hyperLogLogs = Maps.newHashMap();
} else {
determineIntervals = false; determineIntervals = false;
final ImmutableMap.Builder<Interval, HyperLogLog> builder = ImmutableMap.builder(); final ImmutableMap.Builder<Interval, HyperLogLog> builder = ImmutableMap.builder();
for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals()) { for (final Interval bucketInterval : intervals.get()) {
builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE)); builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
} }
hyperLogLogs = builder.build(); hyperLogLogs = builder.build();
} else {
determineIntervals = true;
hyperLogLogs = Maps.newHashMap();
} }
} }
@ -225,18 +224,10 @@ public class DetermineHashedPartitionsJob implements Jobby
Context context Context context
) throws IOException, InterruptedException ) throws IOException, InterruptedException
{ {
// Create group key, there are probably more efficient ways of doing this
final Map<String, Set<String>> dims = Maps.newTreeMap();
for (final String dim : inputRow.getDimensions()) {
final Set<String> dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));
if (dimValues.size() > 0) {
dims.put(dim, dimValues);
}
}
final List<Object> groupKey = ImmutableList.of( final List<Object> groupKey = Rows.toGroupKey(
rollupGranularity.truncate(inputRow.getTimestampFromEpoch()), rollupGranularity.truncate(inputRow.getTimestampFromEpoch()),
dims inputRow
); );
Interval interval; Interval interval;
if (determineIntervals) { if (determineIntervals) {
@ -338,7 +329,7 @@ public class DetermineHashedPartitionsJob implements Jobby
throws IOException, InterruptedException throws IOException, InterruptedException
{ {
super.run(context); super.run(context);
if (config.getSegmentGranularIntervals().isEmpty()) { if (!config.getSegmentGranularIntervals().isPresent()) {
final Path outPath = config.makeIntervalInfoPath(); final Path outPath = config.makeIntervalInfoPath();
final OutputStream out = Utils.makePathAndOutputStream( final OutputStream out = Utils.makePathAndOutputStream(
context, outPath, config.isOverwriteFiles() context, outPath, config.isOverwriteFiles()

View File

@ -26,9 +26,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ComparisonChain; import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -40,7 +38,9 @@ import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.collections.CombiningIterable; import io.druid.collections.CombiningIterable;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec; import io.druid.timeline.partition.SingleDimensionShardSpec;
@ -76,7 +76,6 @@ import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so, * Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so,
@ -115,6 +114,10 @@ public class DeterminePartitionsJob implements Jobby
* in the final segment. * in the final segment.
*/ */
if(!(config.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec)){
throw new ISE("DeterminePartitionsJob can only be run for SingleDimensionPartitionsSpec, partitionSpec found [%s]", config.getPartitionsSpec());
}
if (!config.getPartitionsSpec().isAssumeGrouped()) { if (!config.getPartitionsSpec().isAssumeGrouped()) {
final Job groupByJob = new Job( final Job groupByJob = new Job(
new Configuration(), new Configuration(),
@ -180,7 +183,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.setOutputValueClass(Text.class); dimSelectionJob.setOutputValueClass(Text.class);
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class); dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class);
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().size()); dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
JobHelper.setupClasspath(config, dimSelectionJob); JobHelper.setupClasspath(config, dimSelectionJob);
config.intoConfiguration(dimSelectionJob); config.intoConfiguration(dimSelectionJob);
@ -206,7 +209,7 @@ public class DeterminePartitionsJob implements Jobby
FileSystem fileSystem = null; FileSystem fileSystem = null;
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0; int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity); final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity);
if (fileSystem == null) { if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
@ -257,17 +260,9 @@ public class DeterminePartitionsJob implements Jobby
Context context Context context
) throws IOException, InterruptedException ) throws IOException, InterruptedException
{ {
// Create group key, there are probably more efficient ways of doing this final List<Object> groupKey = Rows.toGroupKey(
final Map<String, Set<String>> dims = Maps.newTreeMap();
for (final String dim : inputRow.getDimensions()) {
final Set<String> dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));
if (dimValues.size() > 0) {
dims.put(dim, dimValues);
}
}
final List<Object> groupKey = ImmutableList.of(
rollupGranularity.truncate(inputRow.getTimestampFromEpoch()), rollupGranularity.truncate(inputRow.getTimestampFromEpoch()),
dims inputRow
); );
context.write( context.write(
new BytesWritable(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)), new BytesWritable(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)),
@ -303,8 +298,8 @@ public class DeterminePartitionsJob implements Jobby
throws IOException, InterruptedException throws IOException, InterruptedException
{ {
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration()); final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
final String partitionDimension = config.getPartitionDimension(); SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension); helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
} }
@Override @Override
@ -335,8 +330,8 @@ public class DeterminePartitionsJob implements Jobby
{ {
super.setup(context); super.setup(context);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration()); final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
final String partitionDimension = config.getPartitionDimension(); final SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension); helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
} }
@Override @Override
@ -371,7 +366,7 @@ public class DeterminePartitionsJob implements Jobby
final ImmutableMap.Builder<DateTime, Integer> timeIndexBuilder = ImmutableMap.builder(); final ImmutableMap.Builder<DateTime, Integer> timeIndexBuilder = ImmutableMap.builder();
int idx = 0; int idx = 0;
for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals()) { for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals().get()) {
timeIndexBuilder.put(bucketInterval.getStart(), idx); timeIndexBuilder.put(bucketInterval.getStart(), idx);
idx++; idx++;
} }

View File

@ -0,0 +1,74 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
/**
*/
public class HadoopDruidDetermineConfigurationJob implements Jobby
{
private static final Logger log = new Logger(HadoopDruidDetermineConfigurationJob.class);
private final HadoopDruidIndexerConfig config;
@Inject
public HadoopDruidDetermineConfigurationJob(
HadoopDruidIndexerConfig config
)
{
this.config = config;
}
@Override
public boolean run()
{
List<Jobby> jobs = Lists.newArrayList();
JobHelper.ensurePaths(config);
if (config.isDeterminingPartitions()) {
jobs.add(config.getPartitionsSpec().getPartitionJob(config));
} else {
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart();
final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
shardSpecs.put(bucket, Lists.newArrayList(spec));
log.info("DateTime[%s], spec[%s]", bucket, spec);
}
config.setShardSpecs(shardSpecs);
}
return JobHelper.runJobs(jobs, config);
}
}

View File

@ -50,6 +50,7 @@ import io.druid.guice.annotations.Self;
import io.druid.indexer.granularity.GranularitySpec; import io.druid.indexer.granularity.GranularitySpec;
import io.druid.indexer.granularity.UniformGranularitySpec; import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.path.PathSpec; import io.druid.indexer.path.PathSpec;
import io.druid.indexer.rollup.DataRollupSpec; import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec; import io.druid.indexer.updater.DbUpdaterJobSpec;
@ -73,6 +74,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedSet;
/** /**
*/ */
@ -178,7 +180,7 @@ public class HadoopDruidIndexerConfig
this.partitionsSpec = partitionsSpec; this.partitionsSpec = partitionsSpec;
} else { } else {
// Backwards compatibility // Backwards compatibility
this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, null, false); this.partitionsSpec = new SingleDimensionPartitionsSpec(partitionDimension, targetPartitionSize, null, false);
} }
if (granularitySpec != null) { if (granularitySpec != null) {
@ -378,14 +380,14 @@ public class HadoopDruidIndexerConfig
this.ignoreInvalidRows = ignoreInvalidRows; this.ignoreInvalidRows = ignoreInvalidRows;
} }
public List<Interval> getIntervals() public Optional<List<Interval>> getIntervals()
{ {
return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals()); Optional<SortedSet<Interval>> setOptional = getGranularitySpec().bucketIntervals();
} if (setOptional.isPresent()) {
return Optional.of((List<Interval>) JodaUtils.condenseIntervals(setOptional.get()));
public String getPartitionDimension() } else {
{ return Optional.absent();
return partitionsSpec.getPartitionDimension(); }
} }
public boolean isDeterminingPartitions() public boolean isDeterminingPartitions()
@ -483,55 +485,63 @@ public class HadoopDruidIndexerConfig
throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards); throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards);
} }
public Set<Interval> getSegmentGranularIntervals() public Optional<Set<Interval>> getSegmentGranularIntervals()
{ {
return granularitySpec.bucketIntervals(); return Optional.fromNullable((Set<Interval>) granularitySpec.bucketIntervals().orNull());
} }
public Iterable<Bucket> getAllBuckets() public Optional<Iterable<Bucket>> getAllBuckets()
{ {
return FunctionalIterable Optional<Set<Interval>> intervals = getSegmentGranularIntervals();
.create(getSegmentGranularIntervals()) if (intervals.isPresent()) {
.transformCat( return Optional.of(
new Function<Interval, Iterable<Bucket>>() (Iterable<Bucket>) FunctionalIterable
{ .create(intervals.get())
@Override .transformCat(
public Iterable<Bucket> apply(Interval input) new Function<Interval, Iterable<Bucket>>()
{ {
final DateTime bucketTime = input.getStart(); @Override
final List<HadoopyShardSpec> specs = shardSpecs.get(bucketTime); public Iterable<Bucket> apply(Interval input)
if (specs == null) { {
return ImmutableList.of(); final DateTime bucketTime = input.getStart();
} final List<HadoopyShardSpec> specs = shardSpecs.get(bucketTime);
if (specs == null) {
return ImmutableList.of();
}
return FunctionalIterable return FunctionalIterable
.create(specs) .create(specs)
.transform( .transform(
new Function<HadoopyShardSpec, Bucket>() new Function<HadoopyShardSpec, Bucket>()
{ {
int i = 0; int i = 0;
@Override @Override
public Bucket apply(HadoopyShardSpec input) public Bucket apply(HadoopyShardSpec input)
{ {
return new Bucket(input.getShardNum(), bucketTime, i++); return new Bucket(input.getShardNum(), bucketTime, i++);
} }
} }
); );
} }
} }
); )
);
} else {
return Optional.absent();
}
} }
/****************************************** /******************************************
Path helper logic Path helper logic
******************************************/ ******************************************/
/**
* Make the intermediate path for this job run.
*
* @return the intermediate path for this job run.
*/
/**
* Make the intermediate path for this job run.
*
* @return the intermediate path for this job run.
*/
public Path makeIntermediatePath() public Path makeIntermediatePath()
{ {
return new Path(String.format("%s/%s/%s", getWorkingPath(), getDataSource(), getVersion().replace(":", ""))); return new Path(String.format("%s/%s/%s", getWorkingPath(), getDataSource(), getVersion().replace(":", "")));

View File

@ -46,6 +46,10 @@ public class HadoopDruidIndexerConfigBuilder
return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class); return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class);
} }
public static HadoopDruidIndexerSchema toSchema(HadoopDruidIndexerConfig config){
return HadoopDruidIndexerConfig.jsonMapper.convertValue(config, HadoopDruidIndexerSchema.class);
}
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec) public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{ {
return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class); return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);

View File

@ -19,34 +19,20 @@
package io.druid.indexer; package io.druid.indexer;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
*/ */
public class HadoopDruidIndexerJob implements Jobby public class HadoopDruidIndexerJob implements Jobby
{ {
private static final Logger log = new Logger(HadoopDruidIndexerJob.class); private static final Logger log = new Logger(HadoopDruidIndexerJob.class);
private final HadoopDruidIndexerConfig config; private final HadoopDruidIndexerConfig config;
private final DbUpdaterJob dbUpdaterJob; private final DbUpdaterJob dbUpdaterJob;
private IndexGeneratorJob indexJob; private IndexGeneratorJob indexJob;
private volatile List<DataSegment> publishedSegments = null; private volatile List<DataSegment> publishedSegments = null;
@ -69,27 +55,7 @@ public class HadoopDruidIndexerJob implements Jobby
public boolean run() public boolean run()
{ {
List<Jobby> jobs = Lists.newArrayList(); List<Jobby> jobs = Lists.newArrayList();
JobHelper.ensurePaths(config);
ensurePaths();
if (config.isDeterminingPartitions()) {
if(config.getPartitionDimension() == null){
jobs.add(new DetermineHashedPartitionsJob(config));
} else {
jobs.add(new DeterminePartitionsJob(config));
}
}
else {
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
DateTime bucket = segmentGranularity.getStart();
final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
shardSpecs.put(bucket, Lists.newArrayList(spec));
log.info("DateTime[%s], spec[%s]", bucket, spec);
}
config.setShardSpecs(shardSpecs);
}
indexJob = new IndexGeneratorJob(config); indexJob = new IndexGeneratorJob(config);
jobs.add(indexJob); jobs.add(indexJob);
@ -100,60 +66,15 @@ public class HadoopDruidIndexerJob implements Jobby
log.info("No updaterJobSpec set, not uploading to database"); log.info("No updaterJobSpec set, not uploading to database");
} }
String failedMessage = null;
for (Jobby job : jobs) {
if (failedMessage == null) {
if (!job.run()) {
failedMessage = String.format("Job[%s] failed!", job.getClass());
}
}
}
if (failedMessage == null) {
publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
}
if (!config.isLeaveIntermediate()) {
if (failedMessage == null || config.isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath);
try {
workingPath.getFileSystem(new Configuration()).delete(workingPath, true);
}
catch (IOException e) {
log.error(e, "Failed to cleanup path[%s]", workingPath);
}
}
}
if (failedMessage != null) {
throw new ISE(failedMessage);
}
JobHelper.runJobs(jobs, config);
publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
return true; return true;
} }
private void ensurePaths() public List<DataSegment> getPublishedSegments()
{ {
// config.addInputPaths() can have side-effects ( boo! :( ), so this stuff needs to be done before anything else if (publishedSegments == null) {
try {
Job job = new Job(
new Configuration(),
String.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals())
);
job.getConfiguration().set("io.sort.record.percent", "0.19");
JobHelper.injectSystemProperties(job);
config.addInputPaths(job);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public List<DataSegment> getPublishedSegments() {
if(publishedSegments == null) {
throw new IllegalStateException("Job hasn't run yet. No segments have been published yet."); throw new IllegalStateException("Job hasn't run yet. No segments have been published yet.");
} }
return publishedSegments; return publishedSegments;

View File

@ -22,6 +22,7 @@ package io.druid.indexer;
import com.metamx.common.RE; import com.metamx.common.RE;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
import io.druid.indexer.granularity.GranularitySpec;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
@ -70,10 +71,9 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
throw e; throw e;
} }
} }
GranularitySpec spec = config.getGranularitySpec();
if (config.getGranularitySpec().bucketIntervals().isEmpty() || config.getGranularitySpec() if (!spec.bucketIntervals().isPresent() || spec.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()))
.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())) .isPresent()) {
.isPresent()) {
innerMap(inputRow, value, context); innerMap(inputRow, value, context);
} }
} }

View File

@ -153,7 +153,7 @@ public class IndexGeneratorJob implements Jobby
SortableBytes.useSortableBytesAsMapOutputKey(job); SortableBytes.useSortableBytesAsMapOutputKey(job);
job.setNumReduceTasks(Iterables.size(config.getAllBuckets())); job.setNumReduceTasks(Iterables.size(config.getAllBuckets().get()));
job.setPartitionerClass(IndexGeneratorPartitioner.class); job.setPartitionerClass(IndexGeneratorPartitioner.class);
job.setReducerClass(IndexGeneratorReducer.class); job.setReducerClass(IndexGeneratorReducer.class);

View File

@ -20,9 +20,11 @@
package io.druid.indexer; package io.druid.indexer;
import com.google.api.client.util.Sets; import com.google.api.client.util.Sets;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.common.io.OutputSupplier; import com.google.common.io.OutputSupplier;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.filecache.DistributedCache;
@ -34,6 +36,7 @@ import org.apache.hadoop.mapreduce.Job;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
@ -104,4 +107,53 @@ public class JobHelper
} }
} }
} }
public static void ensurePaths(HadoopDruidIndexerConfig config)
{
// config.addInputPaths() can have side-effects ( boo! :( ), so this stuff needs to be done before anything else
try {
Job job = new Job(
new Configuration(),
String.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals())
);
job.getConfiguration().set("io.sort.record.percent", "0.19");
injectSystemProperties(job);
config.addInputPaths(job);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config){
String failedMessage = null;
for (Jobby job : jobs) {
if (failedMessage == null) {
if (!job.run()) {
failedMessage = String.format("Job[%s] failed!", job.getClass());
}
}
}
if (!config.isLeaveIntermediate()) {
if (failedMessage == null || config.isCleanupOnFailure()) {
Path workingPath = config.makeIntermediatePath();
log.info("Deleting path[%s]", workingPath);
try {
workingPath.getFileSystem(new Configuration()).delete(workingPath, true);
}
catch (IOException e) {
log.error(e, "Failed to cleanup path[%s]", workingPath);
}
}
}
if (failedMessage != null) {
throw new ISE(failedMessage);
}
return true;
}
} }

View File

@ -72,9 +72,9 @@ public class ArbitraryGranularitySpec implements GranularitySpec
@Override @Override
@JsonProperty("intervals") @JsonProperty("intervals")
public SortedSet<Interval> bucketIntervals() public Optional<SortedSet<Interval>> bucketIntervals()
{ {
return intervals; return Optional.of((SortedSet<Interval>) intervals);
} }
@Override @Override

View File

@ -40,7 +40,7 @@ import java.util.SortedSet;
public interface GranularitySpec public interface GranularitySpec
{ {
/** Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.*/ /** Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.*/
public SortedSet<Interval> bucketIntervals(); public Optional<SortedSet<Interval>> bucketIntervals();
/** Time-grouping interval corresponding to some instant, if any. */ /** Time-grouping interval corresponding to some instant, if any. */
public Optional<Interval> bucketInterval(DateTime dt); public Optional<Interval> bucketInterval(DateTime dt);

View File

@ -29,14 +29,13 @@ import com.metamx.common.Granularity;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.SortedSet; import java.util.SortedSet;
public class UniformGranularitySpec implements GranularitySpec public class UniformGranularitySpec implements GranularitySpec
{ {
final private Granularity granularity; final private Granularity granularity;
final private List<Interval> inputIntervals; final private Iterable<Interval> inputIntervals;
final private ArbitraryGranularitySpec wrappedSpec; final private ArbitraryGranularitySpec wrappedSpec;
@JsonCreator @JsonCreator
@ -45,23 +44,28 @@ public class UniformGranularitySpec implements GranularitySpec
@JsonProperty("intervals") List<Interval> inputIntervals @JsonProperty("intervals") List<Interval> inputIntervals
) )
{ {
List<Interval> granularIntervals = Lists.newArrayList(); this.granularity = granularity;
if (inputIntervals != null) { if (inputIntervals != null) {
List<Interval> granularIntervals = Lists.newArrayList();
for (Interval inputInterval : inputIntervals) { for (Interval inputInterval : inputIntervals) {
Iterables.addAll(granularIntervals, granularity.getIterable(inputInterval)); Iterables.addAll(granularIntervals, granularity.getIterable(inputInterval));
} }
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals);
} else {
this.inputIntervals = null;
this.wrappedSpec = null;
} }
this.granularity = granularity;
this.inputIntervals = inputIntervals == null ? Collections.EMPTY_LIST : ImmutableList.copyOf(inputIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals);
} }
@Override @Override
public SortedSet<Interval> bucketIntervals() public Optional<SortedSet<Interval>> bucketIntervals()
{ {
return wrappedSpec.bucketIntervals(); if (wrappedSpec == null) {
return Optional.absent();
} else {
return wrappedSpec.bucketIntervals();
}
} }
@Override @Override
@ -78,8 +82,8 @@ public class UniformGranularitySpec implements GranularitySpec
} }
@JsonProperty("intervals") @JsonProperty("intervals")
public Iterable<Interval> getIntervals() public Optional<Iterable<Interval>> getIntervals()
{ {
return inputIntervals; return Optional.fromNullable(inputIntervals);
} }
} }

View File

@ -0,0 +1,68 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonProperty;
public abstract class AbstractPartitionsSpec implements PartitionsSpec
{
private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
private final long targetPartitionSize;
private final long maxPartitionSize;
private final boolean assumeGrouped;
public AbstractPartitionsSpec(
Long targetPartitionSize,
Long maxPartitionSize,
Boolean assumeGrouped
)
{
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
this.maxPartitionSize = maxPartitionSize == null
? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
: maxPartitionSize;
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
}
@JsonProperty
public long getTargetPartitionSize()
{
return targetPartitionSize;
}
@JsonProperty
public long getMaxPartitionSize()
{
return maxPartitionSize;
}
@JsonProperty
public boolean isAssumeGrouped()
{
return assumeGrouped;
}
@Override
public boolean isDeterminingPartitions()
{
return targetPartitionSize > 0;
}
}

View File

@ -19,69 +19,33 @@
package io.druid.indexer.partitions; package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleDimensionPartitionsSpec.class)
@JsonSubTypes(value = {
public class PartitionsSpec @JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class),
@JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class)
})
public interface PartitionsSpec
{ {
private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5; @JsonIgnore
public Jobby getPartitionJob(HadoopDruidIndexerConfig config);
@Nullable @JsonProperty
private final String partitionDimension; public long getTargetPartitionSize();
private final long targetPartitionSize; @JsonProperty
public long getMaxPartitionSize();
private final long maxPartitionSize; @JsonProperty
public boolean isAssumeGrouped();
private final boolean assumeGrouped;
@JsonCreator
public PartitionsSpec(
@JsonProperty("partitionDimension") @Nullable String partitionDimension,
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
this.partitionDimension = partitionDimension;
this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
this.maxPartitionSize = maxPartitionSize == null
? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
: maxPartitionSize;
this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
}
@JsonIgnore @JsonIgnore
public boolean isDeterminingPartitions() public boolean isDeterminingPartitions();
{
return targetPartitionSize > 0;
}
@JsonProperty
@Nullable
public String getPartitionDimension()
{
return partitionDimension;
}
@JsonProperty
public long getTargetPartitionSize()
{
return targetPartitionSize;
}
@JsonProperty
public long getMaxPartitionSize()
{
return maxPartitionSize;
}
@JsonProperty
public boolean isAssumeGrouped()
{
return assumeGrouped;
}
} }

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexer.DetermineHashedPartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
public class RandomPartitionsSpec extends AbstractPartitionsSpec
{
@JsonCreator
public RandomPartitionsSpec(
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
}
@Override
public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DetermineHashedPartitionsJob(config);
}
}

View File

@ -0,0 +1,60 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer.partitions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexer.DeterminePartitionsJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import javax.annotation.Nullable;
public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec
{
@Nullable
private final String partitionDimension;
@JsonCreator
public SingleDimensionPartitionsSpec(
@JsonProperty("partitionDimension") @Nullable String partitionDimension,
@JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
@JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
@JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
)
{
super(targetPartitionSize, maxPartitionSize, assumeGrouped);
this.partitionDimension = partitionDimension;
}
@JsonProperty
@Nullable
public String getPartitionDimension()
{
return partitionDimension;
}
@Override
public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
{
return new DeterminePartitionsJob(config);
}
}

View File

@ -20,6 +20,7 @@
package io.druid.indexer.path; package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
@ -99,9 +100,12 @@ public class GranularityPathSpec implements PathSpec
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException
{ {
final Set<Interval> intervals = Sets.newTreeSet(Comparators.intervals()); final Set<Interval> intervals = Sets.newTreeSet(Comparators.intervals());
for (Interval segmentInterval : config.getSegmentGranularIntervals()) { Optional<Set<Interval>> optionalIntervals = config.getSegmentGranularIntervals();
for (Interval dataInterval : dataGranularity.getIterable(segmentInterval)) { if (optionalIntervals.isPresent()) {
intervals.add(dataInterval); for (Interval segmentInterval : optionalIntervals.get()) {
for (Interval dataInterval : dataGranularity.getIterable(segmentInterval)) {
intervals.add(dataInterval);
}
} }
} }

View File

@ -25,6 +25,8 @@ import com.google.common.collect.Lists;
import io.druid.db.DbConnectorConfig; import io.druid.db.DbConnectorConfig;
import io.druid.indexer.granularity.UniformGranularitySpec; import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec; import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
@ -65,7 +67,7 @@ public class HadoopDruidIndexerConfigTest
Assert.assertEquals( Assert.assertEquals(
"getIntervals", "getIntervals",
Lists.newArrayList(new Interval("2012-01-01/P1D")), Lists.newArrayList(new Interval("2012-01-01/P1D")),
granularitySpec.getIntervals() granularitySpec.getIntervals().get()
); );
Assert.assertEquals( Assert.assertEquals(
@ -99,7 +101,7 @@ public class HadoopDruidIndexerConfigTest
Assert.assertEquals( Assert.assertEquals(
"getIntervals", "getIntervals",
Lists.newArrayList(new Interval("2012-02-01/P1D")), Lists.newArrayList(new Interval("2012-02-01/P1D")),
granularitySpec.getIntervals() granularitySpec.getIntervals().get()
); );
Assert.assertEquals( Assert.assertEquals(
@ -167,15 +169,14 @@ public class HadoopDruidIndexerConfigTest
100 100
); );
Assert.assertEquals( Assert.assertTrue(
"getPartitionDimension", "partitionSpec",
partitionsSpec.getPartitionDimension(), partitionsSpec instanceof SingleDimensionPartitionsSpec
null
); );
} }
@Test @Test
public void testPartitionsSpecSpecificDimension() public void testPartitionsSpecSpecificDimensionLegacy()
{ {
final HadoopDruidIndexerConfig cfg; final HadoopDruidIndexerConfig cfg;
@ -214,9 +215,10 @@ public class HadoopDruidIndexerConfigTest
150 150
); );
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals( Assert.assertEquals(
"getPartitionDimension", "getPartitionDimension",
partitionsSpec.getPartitionDimension(), ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
"foo" "foo"
); );
} }
@ -259,9 +261,10 @@ public class HadoopDruidIndexerConfigTest
150 150
); );
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals( Assert.assertEquals(
"getPartitionDimension", "getPartitionDimension",
partitionsSpec.getPartitionDimension(), ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
"foo" "foo"
); );
} }
@ -307,9 +310,10 @@ public class HadoopDruidIndexerConfigTest
200 200
); );
Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals( Assert.assertEquals(
"getPartitionDimension", "getPartitionDimension",
partitionsSpec.getPartitionDimension(), ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
"foo" "foo"
); );
} }
@ -420,7 +424,6 @@ public class HadoopDruidIndexerConfigTest
); );
} }
@Test @Test
public void shouldMakeHDFSCompliantSegmentOutputPath() public void shouldMakeHDFSCompliantSegmentOutputPath()
{ {

View File

@ -54,7 +54,7 @@ public class ArbitraryGranularityTest
new Interval("2012-01-08T00Z/2012-01-11T00Z"), new Interval("2012-01-08T00Z/2012-01-11T00Z"),
new Interval("2012-02-01T00Z/2012-03-01T00Z") new Interval("2012-02-01T00Z/2012-03-01T00Z")
), ),
Lists.newArrayList(spec.bucketIntervals()) Lists.newArrayList(spec.bucketIntervals().get())
); );
Assert.assertEquals( Assert.assertEquals(

View File

@ -57,7 +57,7 @@ public class UniformGranularityTest
new Interval("2012-01-09T00Z/P1D"), new Interval("2012-01-09T00Z/P1D"),
new Interval("2012-01-10T00Z/P1D") new Interval("2012-01-10T00Z/P1D")
), ),
Lists.newArrayList(spec.bucketIntervals()) Lists.newArrayList(spec.bucketIntervals().get())
); );
Assert.assertEquals( Assert.assertEquals(

View File

@ -25,17 +25,21 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.util.Lists; import com.google.api.client.util.Lists;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigBuilder; import io.druid.indexer.HadoopDruidIndexerConfigBuilder;
import io.druid.indexer.HadoopDruidIndexerJob; import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopDruidIndexerSchema; import io.druid.indexer.HadoopDruidIndexerSchema;
import io.druid.indexer.Jobby;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.initialization.Initialization; import io.druid.initialization.Initialization;
@ -51,6 +55,7 @@ import java.net.URL;
import java.net.URLClassLoader; import java.net.URLClassLoader;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.SortedSet;
public class HadoopIndexTask extends AbstractTask public class HadoopIndexTask extends AbstractTask
{ {
@ -107,11 +112,11 @@ public class HadoopIndexTask extends AbstractTask
@Override @Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception public boolean isReady(TaskActionClient taskActionClient) throws Exception
{ {
if (!schema.getGranularitySpec().bucketIntervals().isEmpty()) { Optional<SortedSet<Interval>> intervals = schema.getGranularitySpec().bucketIntervals();
if (intervals.isPresent()) {
Interval interval = JodaUtils.umbrellaInterval( Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals( JodaUtils.condenseIntervals(
schema.getGranularitySpec() intervals.get()
.bucketIntervals()
) )
); );
return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent(); return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent();
@ -166,31 +171,49 @@ public class HadoopIndexTask extends AbstractTask
jobUrls.addAll(extensionURLs); jobUrls.addAll(extensionURLs);
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls)); System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls));
boolean determineIntervals = !schema.getGranularitySpec().bucketIntervals().isPresent();
final Class<?> determineConfigurationMainClass = loader.loadClass(HadoopDetermineConfigInnerProcessing.class.getName());
final Method determineConfigurationMainMethod = determineConfigurationMainClass.getMethod(
"runTask",
String[].class
);
String[] determineConfigArgs = new String[]{
toolbox.getObjectMapper().writeValueAsString(schema),
toolbox.getConfig().getHadoopWorkingPath(),
toolbox.getSegmentPusher().getPathForHadoop(getDataSource())
};
String config = (String) determineConfigurationMainMethod.invoke(null, new Object[]{determineConfigArgs});
HadoopDruidIndexerSchema indexerSchema = toolbox.getObjectMapper()
.readValue(config, HadoopDruidIndexerSchema.class);
final Class<?> mainClass = loader.loadClass(HadoopIndexTaskInnerProcessing.class.getName());
final Method mainMethod = mainClass.getMethod("runTask", String[].class);
// We should have a lock from before we started running only if interval was specified // We should have a lock from before we started running only if interval was specified
boolean determineIntervals = schema.getGranularitySpec().bucketIntervals().isEmpty();
final String version; final String version;
if (determineIntervals) { if (determineIntervals) {
version = new DateTime().toString(); Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
indexerSchema.getGranularitySpec().bucketIntervals().get()
)
);
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval));
version = lock.getVersion();
} else { } else {
Iterable<TaskLock> locks = getTaskLocks(toolbox); Iterable<TaskLock> locks = getTaskLocks(toolbox);
final TaskLock myLock = Iterables.getOnlyElement(locks); final TaskLock myLock = Iterables.getOnlyElement(locks);
version = myLock.getVersion(); version = myLock.getVersion();
log.info("Setting version to: %s", myLock.getVersion());
} }
log.info("Setting version to: %s", version);
final Class<?> indexGeneratorMainClass = loader.loadClass(HadoopIndexGeneratorInnerProcessing.class.getName());
String[] args = new String[]{ final Method indexGeneratorMainMethod = indexGeneratorMainClass.getMethod("runTask", String[].class);
toolbox.getObjectMapper().writeValueAsString(schema), String[] indexGeneratorArgs = new String[]{
version, toolbox.getObjectMapper().writeValueAsString(indexerSchema),
toolbox.getConfig().getHadoopWorkingPath(), version
toolbox.getSegmentPusher().getPathForHadoop(getDataSource()),
}; };
String segments = (String) indexGeneratorMainMethod.invoke(null, new Object[]{indexGeneratorArgs});
String segments = (String) mainMethod.invoke(null, new Object[]{args});
if (segments != null) { if (segments != null) {
@ -202,15 +225,6 @@ public class HadoopIndexTask extends AbstractTask
} }
); );
// What if we cannot take the lock ??
if (determineIntervals) {
List<Interval> intervals = Lists.newArrayList();
for (DataSegment segment : publishedSegments) {
intervals.add(segment.getInterval());
}
Interval interval = JodaUtils.umbrellaInterval(JodaUtils.condenseIntervals(intervals));
toolbox.getTaskActionClient().submit(new LockTryAcquireAction(interval));
}
toolbox.pushSegments(publishedSegments); toolbox.pushSegments(publishedSegments);
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} else { } else {
@ -218,14 +232,12 @@ public class HadoopIndexTask extends AbstractTask
} }
} }
public static class HadoopIndexTaskInnerProcessing public static class HadoopIndexGeneratorInnerProcessing
{ {
public static String runTask(String[] args) throws Exception public static String runTask(String[] args) throws Exception
{ {
final String schema = args[0]; final String schema = args[0];
final String version = args[1]; String version = args[1];
final String workingPath = args[2];
final String segmentOutputPath = args[3];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue( .readValue(
@ -235,12 +247,6 @@ public class HadoopIndexTask extends AbstractTask
final HadoopDruidIndexerConfig config = final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema) new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withVersion(version) .withVersion(version)
.withWorkingPath(
workingPath
)
.withSegmentOutputPath(
segmentOutputPath
)
.build(); .build();
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config); HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config);
@ -253,4 +259,34 @@ public class HadoopIndexTask extends AbstractTask
return null; return null;
} }
} }
public static class HadoopDetermineConfigInnerProcessing
{
public static String runTask(String[] args) throws Exception
{
final String schema = args[0];
final String workingPath = args[1];
final String segmentOutputPath = args[2];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue(
schema,
HadoopDruidIndexerSchema.class
);
final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withWorkingPath(workingPath)
.withSegmentOutputPath(segmentOutputPath)
.build();
Jobby job = new HadoopDruidDetermineConfigurationJob(config);
log.info("Starting a hadoop index generator job...");
if (job.run()) {
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(HadoopDruidIndexerConfigBuilder.toSchema(config));
}
return null;
}
}
} }

View File

@ -109,8 +109,8 @@ public class IndexTask extends AbstractFixedIntervalTask
id != null ? id : String.format("index_%s_%s", dataSource, new DateTime().toString()), id != null ? id : String.format("index_%s_%s", dataSource, new DateTime().toString()),
dataSource, dataSource,
new Interval( new Interval(
granularitySpec.bucketIntervals().first().getStart(), granularitySpec.bucketIntervals().get().first().getStart(),
granularitySpec.bucketIntervals().last().getEnd() granularitySpec.bucketIntervals().get().last().getEnd()
) )
); );
@ -137,7 +137,7 @@ public class IndexTask extends AbstractFixedIntervalTask
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Set<DataSegment> segments = Sets.newHashSet(); final Set<DataSegment> segments = Sets.newHashSet();
final Set<Interval> validIntervals = Sets.intersection(granularitySpec.bucketIntervals(), getDataIntervals()); final Set<Interval> validIntervals = Sets.intersection(granularitySpec.bucketIntervals().get(), getDataIntervals());
if (validIntervals.isEmpty()) { if (validIntervals.isEmpty()) {
throw new ISE("No valid data intervals found. Check your configs!"); throw new ISE("No valid data intervals found. Check your configs!");
} }

View File

@ -25,16 +25,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction; import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
public class HashBasedNumberedShardSpec extends NumberedShardSpec public class HashBasedNumberedShardSpec extends NumberedShardSpec
{ {
@ -60,18 +56,7 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec
protected int hash(InputRow inputRow) protected int hash(InputRow inputRow)
{ {
final Map<String, Set<String>> dims = Maps.newTreeMap(); final List<Object> groupKey = Rows.toGroupKey(inputRow.getTimestampFromEpoch(), inputRow);
for (final String dim : inputRow.getDimensions()) {
final Set<String> dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));
if (dimValues.size() > 0) {
dims.put(dim, dimValues);
}
}
final List<Object> groupKey = ImmutableList.of(
inputRow.getTimestampFromEpoch(),
dims
);
try { try {
return hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asInt(); return hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asInt();
} }