diff --git a/docs/content/Batch-ingestion.md b/docs/content/Batch-ingestion.md
index 125e15c716e..534e04fdb42 100644
--- a/docs/content/Batch-ingestion.md
+++ b/docs/content/Batch-ingestion.md
@@ -82,7 +82,6 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
"segmentOutputPath": "s3n:\/\/billy-bucket\/the\/segments\/go\/here",
"leaveIntermediate": "false",
"partitionsSpec": {
- "type": "random"
"targetPartitionSize": 5000000
},
"updaterJobSpec": {
@@ -146,20 +145,12 @@ The indexing process has the ability to roll data up as it processes the incomin
### Partitioning specification
-Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in some other way depending on partition type.
-Druid supports two types of partitions spec - singleDimension and random.
-
-In SingleDimension partition type data is partitioned based on the values in that dimension.
-For example, data for a day may be split by the dimension "last\_name" into two segments: one with all values from A-M and one with all values from N-Z.
-
-In random partition type, the number of partitions is determined based on the targetPartitionSize and cardinality of input set and the data is partitioned based on the hashcode of the row.
-Random partition type is more efficient and gives better distribution of data.
+Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in some other way. For example, data for a day may be split by the dimension "last\_name" into two segments: one with all values from A-M and one with all values from N-Z.
To use this option, the indexer must be given a target partition size. It can then find a good set of partition ranges on its own.
|property|description|required?|
|--------|-----------|---------|
-|type|type of partitionSpec to be used |no, default : singleDimension|
|targetPartitionSize|target number of rows to include in a partition, should be a number that targets segments of 700MB\~1GB.|yes|
|partitionDimension|the dimension to partition on. Leave blank to select a dimension automatically.|no|
|assumeGrouped|assume input data has already been grouped on time and dimensions. This is faster, but can choose suboptimal partitions if the assumption is violated.|no|
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 5a6d98c2835..c3d9e33e5e6 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -97,11 +97,6 @@
junit
test
-
- com.clearspring.analytics
- stream
- 2.5.2
-
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
deleted file mode 100644
index 29950fae345..00000000000
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2014 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.indexer;
-
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
-import com.clearspring.analytics.stream.cardinality.HyperLogLog;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import com.google.common.io.Closeables;
-import com.metamx.common.ISE;
-import com.metamx.common.logger.Logger;
-import io.druid.data.input.InputRow;
-import io.druid.data.input.Rows;
-import io.druid.granularity.QueryGranularity;
-import io.druid.indexer.granularity.UniformGranularitySpec;
-import io.druid.timeline.partition.HashBasedNumberedShardSpec;
-import io.druid.timeline.partition.NoneShardSpec;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeComparator;
-import org.joda.time.Interval;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Determines appropriate ShardSpecs for a job by determining approximate cardinality of data set using HyperLogLog
- */
-public class DetermineHashedPartitionsJob implements Jobby
-{
- private static final int MAX_SHARDS = 128;
- private static final Logger log = new Logger(DetermineHashedPartitionsJob.class);
- private static final int HYPER_LOG_LOG_BIT_SIZE = 20;
- private final HadoopDruidIndexerConfig config;
-
- public DetermineHashedPartitionsJob(
- HadoopDruidIndexerConfig config
- )
- {
- this.config = config;
- }
-
- public boolean run()
- {
- try {
- /*
- * Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear
- * in the final segment.
- */
- long startTime = System.currentTimeMillis();
- final Job groupByJob = new Job(
- new Configuration(),
- String.format("%s-determine_partitions_hashed-%s", config.getDataSource(), config.getIntervals())
- );
-
- JobHelper.injectSystemProperties(groupByJob);
- groupByJob.setInputFormatClass(TextInputFormat.class);
- groupByJob.setMapperClass(DetermineCardinalityMapper.class);
- groupByJob.setMapOutputKeyClass(LongWritable.class);
- groupByJob.setMapOutputValueClass(BytesWritable.class);
- groupByJob.setReducerClass(DetermineCardinalityReducer.class);
- groupByJob.setOutputKeyClass(NullWritable.class);
- groupByJob.setOutputValueClass(NullWritable.class);
- groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
- groupByJob.setNumReduceTasks(1);
- JobHelper.setupClasspath(config, groupByJob);
-
- config.addInputPaths(groupByJob);
- config.intoConfiguration(groupByJob);
- FileOutputFormat.setOutputPath(groupByJob, config.makeGroupedDataDir());
-
- groupByJob.submit();
- log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
-
- if (!groupByJob.waitForCompletion(true)) {
- log.error("Job failed: %s", groupByJob.getJobID());
- return false;
- }
-
- /*
- * Load partitions and intervals determined by the previous job.
- */
-
- log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals());
- FileSystem fileSystem = null;
- if (!config.getSegmentGranularIntervals().isPresent()) {
- final Path intervalInfoPath = config.makeIntervalInfoPath();
- fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
- if (!fileSystem.exists(intervalInfoPath)) {
- throw new ISE("Path[%s] didn't exist!?", intervalInfoPath);
- }
- List intervals = config.jsonMapper.readValue(
- Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference>()
- {
- }
- );
- config.setGranularitySpec(new UniformGranularitySpec(config.getGranularitySpec().getGranularity(), intervals));
- log.info("Determined Intervals for Job [%s]" + config.getSegmentGranularIntervals());
- }
- Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
- int shardCount = 0;
- for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
- DateTime bucket = segmentGranularity.getStart();
-
- final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity);
- if (fileSystem == null) {
- fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration());
- }
- if (fileSystem.exists(partitionInfoPath)) {
- Long cardinality = config.jsonMapper.readValue(
- Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference()
- {
- }
- );
- int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize());
-
- if (numberOfShards > MAX_SHARDS) {
- throw new ISE(
- "Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high",
- numberOfShards,
- MAX_SHARDS
- );
- }
-
- List actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards);
- if (numberOfShards == 1) {
- actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++));
- } else {
- for (int i = 0; i < numberOfShards; ++i) {
- actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++));
- log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
- }
- }
-
- shardSpecs.put(bucket, actualSpecs);
-
- } else {
- log.info("Path[%s] didn't exist!?", partitionInfoPath);
- }
- }
- config.setShardSpecs(shardSpecs);
- log.info(
- "DetermineHashedPartitionsJob took %d millis",
- (System.currentTimeMillis() - startTime)
- );
-
- return true;
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- public static class DetermineCardinalityMapper extends HadoopDruidIndexerMapper
- {
- private static HashFunction hashFunction = Hashing.murmur3_128();
- private QueryGranularity rollupGranularity = null;
- private Map hyperLogLogs;
- private HadoopDruidIndexerConfig config;
- private boolean determineIntervals;
-
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException
- {
- super.setup(context);
- rollupGranularity = getConfig().getRollupSpec().getRollupGranularity();
- config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
- Optional> intervals = config.getSegmentGranularIntervals();
- if (intervals.isPresent()) {
- determineIntervals = false;
- final ImmutableMap.Builder builder = ImmutableMap.builder();
- for (final Interval bucketInterval : intervals.get()) {
- builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
- }
- hyperLogLogs = builder.build();
- } else {
- determineIntervals = true;
- hyperLogLogs = Maps.newHashMap();
- }
- }
-
- @Override
- protected void innerMap(
- InputRow inputRow,
- Text text,
- Context context
- ) throws IOException, InterruptedException
- {
-
- final List groupKey = Rows.toGroupKey(
- rollupGranularity.truncate(inputRow.getTimestampFromEpoch()),
- inputRow
- );
- Interval interval;
- if (determineIntervals) {
- interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
-
- if (!hyperLogLogs.containsKey(interval)) {
- hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE));
- }
- } else {
- final Optional maybeInterval = config.getGranularitySpec()
- .bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()));
-
- if (!maybeInterval.isPresent()) {
- throw new ISE("WTF?! No bucket found for timestamp: %s", inputRow.getTimestampFromEpoch());
- }
- interval = maybeInterval.get();
- }
- hyperLogLogs.get(interval)
- .offerHashed(
- hashFunction.hashBytes(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey))
- .asLong()
- );
- }
-
- @Override
- public void run(Context context) throws IOException, InterruptedException
- {
- setup(context);
-
- while (context.nextKeyValue()) {
- map(context.getCurrentKey(), context.getCurrentValue(), context);
- }
-
- for (Map.Entry entry : hyperLogLogs.entrySet()) {
- context.write(
- new LongWritable(entry.getKey().getStartMillis()),
- new BytesWritable(entry.getValue().getBytes())
- );
- }
- cleanup(context);
- }
-
- }
-
- public static class DetermineCardinalityReducer
- extends Reducer
- {
- private final List intervals = Lists.newArrayList();
- protected HadoopDruidIndexerConfig config = null;
-
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException
- {
- config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
- }
-
- @Override
- protected void reduce(
- LongWritable key,
- Iterable values,
- Context context
- ) throws IOException, InterruptedException
- {
- HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE);
- for (BytesWritable value : values) {
- HyperLogLog logValue = HyperLogLog.Builder.build(value.getBytes());
- try {
- aggregate.addAll(logValue);
- }
- catch (CardinalityMergeException e) {
- e.printStackTrace(); // TODO: check for better handling
- }
- }
- Interval interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(key.get()));
- intervals.add(interval);
- final Path outPath = config.makeSegmentPartitionInfoPath(interval);
- final OutputStream out = Utils.makePathAndOutputStream(
- context, outPath, config.isOverwriteFiles()
- );
-
- try {
- HadoopDruidIndexerConfig.jsonMapper.writerWithType(
- new TypeReference()
- {
- }
- ).writeValue(
- out,
- aggregate.cardinality()
- );
- }
- finally {
- Closeables.close(out, false);
- }
- }
-
- @Override
- public void run(Context context)
- throws IOException, InterruptedException
- {
- super.run(context);
- if (!config.getSegmentGranularIntervals().isPresent()) {
- final Path outPath = config.makeIntervalInfoPath();
- final OutputStream out = Utils.makePathAndOutputStream(
- context, outPath, config.isOverwriteFiles()
- );
-
- try {
- HadoopDruidIndexerConfig.jsonMapper.writerWithType(
- new TypeReference>()
- {
- }
- ).writeValue(
- out,
- intervals
- );
- }
- finally {
- Closeables.close(out, false);
- }
- }
- }
- }
-}
-
-
-
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
index 3ebefb7bddc..30f9471e445 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
@@ -26,7 +26,9 @@ import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -38,9 +40,7 @@ import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.common.logger.Logger;
import io.druid.collections.CombiningIterable;
import io.druid.data.input.InputRow;
-import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
-import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
@@ -76,6 +76,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so,
@@ -106,6 +107,16 @@ public class DeterminePartitionsJob implements Jobby
this.config = config;
}
+ public static void injectSystemProperties(Job job)
+ {
+ final Configuration conf = job.getConfiguration();
+ for (String propName : System.getProperties().stringPropertyNames()) {
+ if (propName.startsWith("hadoop.")) {
+ conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
+ }
+ }
+ }
+
public boolean run()
{
try {
@@ -114,17 +125,13 @@ public class DeterminePartitionsJob implements Jobby
* in the final segment.
*/
- if(!(config.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec)){
- throw new ISE("DeterminePartitionsJob can only be run for SingleDimensionPartitionsSpec, partitionSpec found [%s]", config.getPartitionsSpec());
- }
-
if (!config.getPartitionsSpec().isAssumeGrouped()) {
final Job groupByJob = new Job(
new Configuration(),
String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals())
);
- JobHelper.injectSystemProperties(groupByJob);
+ injectSystemProperties(groupByJob);
groupByJob.setInputFormatClass(TextInputFormat.class);
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
groupByJob.setMapOutputKeyClass(BytesWritable.class);
@@ -161,7 +168,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19");
- JobHelper.injectSystemProperties(dimSelectionJob);
+ injectSystemProperties(dimSelectionJob);
if (!config.getPartitionsSpec().isAssumeGrouped()) {
// Read grouped data from the groupByJob.
@@ -183,7 +190,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.setOutputValueClass(Text.class);
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class);
- dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
+ dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().size());
JobHelper.setupClasspath(config, dimSelectionJob);
config.intoConfiguration(dimSelectionJob);
@@ -209,8 +216,10 @@ public class DeterminePartitionsJob implements Jobby
FileSystem fileSystem = null;
Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
- for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
- final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity);
+ for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
+ DateTime bucket = segmentGranularity.getStart();
+
+ final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0));
if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
}
@@ -224,10 +233,10 @@ public class DeterminePartitionsJob implements Jobby
List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
for (int i = 0; i < specs.size(); ++i) {
actualSpecs.add(new HadoopyShardSpec(specs.get(i), shardCount++));
- log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i));
+ log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i));
}
- shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
+ shardSpecs.put(bucket, actualSpecs);
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
@@ -260,9 +269,17 @@ public class DeterminePartitionsJob implements Jobby
Context context
) throws IOException, InterruptedException
{
- final List groupKey = Rows.toGroupKey(
+ // Create group key, there are probably more efficient ways of doing this
+ final Map> dims = Maps.newTreeMap();
+ for (final String dim : inputRow.getDimensions()) {
+ final Set dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));
+ if (dimValues.size() > 0) {
+ dims.put(dim, dimValues);
+ }
+ }
+ final List groupKey = ImmutableList.of(
rollupGranularity.truncate(inputRow.getTimestampFromEpoch()),
- inputRow
+ dims
);
context.write(
new BytesWritable(HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes(groupKey)),
@@ -298,8 +315,8 @@ public class DeterminePartitionsJob implements Jobby
throws IOException, InterruptedException
{
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
- SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
- helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
+ final String partitionDimension = config.getPartitionDimension();
+ helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
}
@Override
@@ -330,8 +347,8 @@ public class DeterminePartitionsJob implements Jobby
{
super.setup(context);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
- final SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
- helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
+ final String partitionDimension = config.getPartitionDimension();
+ helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
}
@Override
@@ -366,7 +383,7 @@ public class DeterminePartitionsJob implements Jobby
final ImmutableMap.Builder timeIndexBuilder = ImmutableMap.builder();
int idx = 0;
- for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals().get()) {
+ for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals()) {
timeIndexBuilder.put(bucketInterval.getStart(), idx);
idx++;
}
@@ -724,7 +741,7 @@ public class DeterminePartitionsJob implements Jobby
}
final OutputStream out = Utils.makePathAndOutputStream(
- context, config.makeSegmentPartitionInfoPath(config.getGranularitySpec().getGranularity().bucket(bucket)), config.isOverwriteFiles()
+ context, config.makeSegmentPartitionInfoPath(new Bucket(0, bucket, 0)), config.isOverwriteFiles()
);
final DimPartitions chosenPartitions = maxCardinality > HIGH_CARDINALITY_THRESHOLD
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java
deleted file mode 100644
index 2076292260d..00000000000
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.indexer;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.inject.Inject;
-import com.metamx.common.logger.Logger;
-import io.druid.timeline.partition.NoneShardSpec;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeComparator;
-import org.joda.time.Interval;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class HadoopDruidDetermineConfigurationJob implements Jobby
-{
- private static final Logger log = new Logger(HadoopDruidDetermineConfigurationJob.class);
- private final HadoopDruidIndexerConfig config;
-
- @Inject
- public HadoopDruidDetermineConfigurationJob(
- HadoopDruidIndexerConfig config
- )
- {
- this.config = config;
- }
-
- @Override
- public boolean run()
- {
- List jobs = Lists.newArrayList();
-
- JobHelper.ensurePaths(config);
-
- if (config.isDeterminingPartitions()) {
- jobs.add(config.getPartitionsSpec().getPartitionJob(config));
- } else {
- Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
- int shardCount = 0;
- for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
- DateTime bucket = segmentGranularity.getStart();
- final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
- shardSpecs.put(bucket, Lists.newArrayList(spec));
- log.info("DateTime[%s], spec[%s]", bucket, spec);
- }
- config.setShardSpecs(shardSpecs);
- }
-
- return JobHelper.runJobs(jobs, config);
-
- }
-
-}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java
index 881e21e4f78..19586c14113 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java
@@ -50,7 +50,6 @@ import io.druid.guice.annotations.Self;
import io.druid.indexer.granularity.GranularitySpec;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
-import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
@@ -74,7 +73,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
/**
*/
@@ -180,7 +178,7 @@ public class HadoopDruidIndexerConfig
this.partitionsSpec = partitionsSpec;
} else {
// Backwards compatibility
- this.partitionsSpec = new SingleDimensionPartitionsSpec(partitionDimension, targetPartitionSize, null, false);
+ this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, null, false);
}
if (granularitySpec != null) {
@@ -380,17 +378,17 @@ public class HadoopDruidIndexerConfig
this.ignoreInvalidRows = ignoreInvalidRows;
}
- public Optional> getIntervals()
+ public List getIntervals()
{
- Optional> setOptional = getGranularitySpec().bucketIntervals();
- if (setOptional.isPresent()) {
- return Optional.of((List) JodaUtils.condenseIntervals(setOptional.get()));
- } else {
- return Optional.absent();
- }
+ return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals());
}
- public boolean isDeterminingPartitions()
+ public String getPartitionDimension()
+ {
+ return partitionsSpec.getPartitionDimension();
+ }
+
+ public boolean partitionByDimension()
{
return partitionsSpec.isDeterminingPartitions();
}
@@ -485,70 +483,64 @@ public class HadoopDruidIndexerConfig
throw new ISE("row[%s] doesn't fit in any shard[%s]", inputRow, shards);
}
- public Optional> getSegmentGranularIntervals()
+ public Set getSegmentGranularIntervals()
{
- return Optional.fromNullable((Set) granularitySpec.bucketIntervals().orNull());
+ return granularitySpec.bucketIntervals();
}
- public Optional> getAllBuckets()
+ public Iterable getAllBuckets()
{
- Optional> intervals = getSegmentGranularIntervals();
- if (intervals.isPresent()) {
- return Optional.of(
- (Iterable) FunctionalIterable
- .create(intervals.get())
- .transformCat(
- new Function>()
- {
- @Override
- public Iterable apply(Interval input)
- {
- final DateTime bucketTime = input.getStart();
- final List specs = shardSpecs.get(bucketTime);
- if (specs == null) {
- return ImmutableList.of();
- }
+ return FunctionalIterable
+ .create(getSegmentGranularIntervals())
+ .transformCat(
+ new Function>()
+ {
+ @Override
+ public Iterable apply(Interval input)
+ {
+ final DateTime bucketTime = input.getStart();
+ final List specs = shardSpecs.get(bucketTime);
+ if (specs == null) {
+ return ImmutableList.of();
+ }
- return FunctionalIterable
- .create(specs)
- .transform(
- new Function()
- {
- int i = 0;
+ return FunctionalIterable
+ .create(specs)
+ .transform(
+ new Function()
+ {
+ int i = 0;
- @Override
- public Bucket apply(HadoopyShardSpec input)
- {
- return new Bucket(input.getShardNum(), bucketTime, i++);
- }
- }
- );
- }
- }
- )
- );
- } else {
- return Optional.absent();
- }
+ @Override
+ public Bucket apply(HadoopyShardSpec input)
+ {
+ return new Bucket(input.getShardNum(), bucketTime, i++);
+ }
+ }
+ );
+ }
+ }
+ );
}
- /******************************************
- Path helper logic
- ******************************************/
-
- /**
- * Make the intermediate path for this job run.
- *
- * @return the intermediate path for this job run.
- */
+ /******************************************
+ Path helper logic
+ ******************************************/
+ /**
+ * Make the intermediate path for this job run.
+ *
+ * @return the intermediate path for this job run.
+ */
public Path makeIntermediatePath()
{
return new Path(String.format("%s/%s/%s", getWorkingPath(), getDataSource(), getVersion().replace(":", "")));
}
- public Path makeSegmentPartitionInfoPath(Interval bucketInterval)
+ public Path makeSegmentPartitionInfoPath(Bucket bucket)
{
+ final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
+
return new Path(
String.format(
"%s/%s_%s/partitions.json",
@@ -559,16 +551,6 @@ public class HadoopDruidIndexerConfig
);
}
- public Path makeIntervalInfoPath()
- {
- return new Path(
- String.format(
- "%s/intervals.json",
- makeIntermediatePath()
- )
- );
- }
-
public Path makeDescriptorInfoDir()
{
return new Path(makeIntermediatePath(), "segmentDescriptorInfo");
@@ -643,5 +625,8 @@ public class HadoopDruidIndexerConfig
Preconditions.checkNotNull(segmentOutputPath, "segmentOutputPath");
Preconditions.checkNotNull(version, "version");
Preconditions.checkNotNull(rollupSpec, "rollupSpec");
+
+ final int nIntervals = getIntervals().size();
+ Preconditions.checkArgument(nIntervals > 0, "intervals.size()[%s] <= 0", nIntervals);
}
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfigBuilder.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfigBuilder.java
index 5e7adc2ac82..b077cf2fc22 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfigBuilder.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfigBuilder.java
@@ -46,10 +46,6 @@ public class HadoopDruidIndexerConfigBuilder
return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class);
}
- public static HadoopDruidIndexerSchema toSchema(HadoopDruidIndexerConfig config){
- return HadoopDruidIndexerConfig.jsonMapper.convertValue(config, HadoopDruidIndexerSchema.class);
- }
-
public static HadoopDruidIndexerConfig fromMap(Map argSpec)
{
return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java
index 598b3a8db1d..2c593af68a3 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java
@@ -19,20 +19,34 @@
package io.druid.indexer;
+import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.inject.Inject;
+import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeComparator;
+import org.joda.time.Interval;
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
/**
*/
public class HadoopDruidIndexerJob implements Jobby
{
private static final Logger log = new Logger(HadoopDruidIndexerJob.class);
+
private final HadoopDruidIndexerConfig config;
private final DbUpdaterJob dbUpdaterJob;
+
private IndexGeneratorJob indexJob;
private volatile List publishedSegments = null;
@@ -55,7 +69,23 @@ public class HadoopDruidIndexerJob implements Jobby
public boolean run()
{
List jobs = Lists.newArrayList();
- JobHelper.ensurePaths(config);
+
+ ensurePaths();
+
+ if (config.partitionByDimension()) {
+ jobs.add(new DeterminePartitionsJob(config));
+ }
+ else {
+ Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
+ int shardCount = 0;
+ for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
+ DateTime bucket = segmentGranularity.getStart();
+ final HadoopyShardSpec spec = new HadoopyShardSpec(new NoneShardSpec(), shardCount++);
+ shardSpecs.put(bucket, Lists.newArrayList(spec));
+ log.info("DateTime[%s], spec[%s]", bucket, spec);
+ }
+ config.setShardSpecs(shardSpecs);
+ }
indexJob = new IndexGeneratorJob(config);
jobs.add(indexJob);
@@ -66,15 +96,65 @@ public class HadoopDruidIndexerJob implements Jobby
log.info("No updaterJobSpec set, not uploading to database");
}
+ String failedMessage = null;
+ for (Jobby job : jobs) {
+ if (failedMessage == null) {
+ if (!job.run()) {
+ failedMessage = String.format("Job[%s] failed!", job.getClass());
+ }
+ }
+ }
+
+ if (failedMessage == null) {
+ publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
+ }
+
+ if (!config.isLeaveIntermediate()) {
+ if (failedMessage == null || config.isCleanupOnFailure()) {
+ Path workingPath = config.makeIntermediatePath();
+ log.info("Deleting path[%s]", workingPath);
+ try {
+ workingPath.getFileSystem(new Configuration()).delete(workingPath, true);
+ }
+ catch (IOException e) {
+ log.error(e, "Failed to cleanup path[%s]", workingPath);
+ }
+ }
+ }
+
+ if (failedMessage != null) {
+ throw new ISE(failedMessage);
+ }
- JobHelper.runJobs(jobs, config);
- publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
return true;
}
- public List getPublishedSegments()
+ private void ensurePaths()
{
- if (publishedSegments == null) {
+ // config.addInputPaths() can have side-effects ( boo! :( ), so this stuff needs to be done before anything else
+ try {
+ Job job = new Job(
+ new Configuration(),
+ String.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals())
+ );
+
+ job.getConfiguration().set("io.sort.record.percent", "0.19");
+ for (String propName : System.getProperties().stringPropertyNames()) {
+ Configuration conf = job.getConfiguration();
+ if (propName.startsWith("hadoop.")) {
+ conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
+ }
+ }
+
+ config.addInputPaths(job);
+ }
+ catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public List getPublishedSegments() {
+ if(publishedSegments == null) {
throw new IllegalStateException("Job hasn't run yet. No segments have been published yet.");
}
return publishedSegments;
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java
index 9ccb351ba65..2eedaf76d31 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java
@@ -22,7 +22,6 @@ package io.druid.indexer;
import com.metamx.common.RE;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
-import io.druid.indexer.granularity.GranularitySpec;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
@@ -71,9 +70,8 @@ public abstract class HadoopDruidIndexerMapper extends Mapper<
throw e;
}
}
- GranularitySpec spec = config.getGranularitySpec();
- if (!spec.bucketIntervals().isPresent() || spec.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()))
- .isPresent()) {
+
+ if(config.getGranularitySpec().bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())).isPresent()) {
innerMap(inputRow, value, context);
}
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
index 7e3ccbe437e..fde7161d8a4 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
@@ -44,7 +44,6 @@ import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -85,7 +84,9 @@ import java.util.zip.ZipOutputStream;
public class IndexGeneratorJob implements Jobby
{
private static final Logger log = new Logger(IndexGeneratorJob.class);
+
private final HadoopDruidIndexerConfig config;
+
private IndexGeneratorStats jobStats;
public IndexGeneratorJob(
@@ -96,6 +97,65 @@ public class IndexGeneratorJob implements Jobby
this.jobStats = new IndexGeneratorStats();
}
+ public IndexGeneratorStats getJobStats()
+ {
+ return jobStats;
+ }
+
+ public boolean run()
+ {
+ try {
+ Job job = new Job(
+ new Configuration(),
+ String.format("%s-index-generator-%s", config.getDataSource(), config.getIntervals())
+ );
+
+ job.getConfiguration().set("io.sort.record.percent", "0.23");
+
+ for (String propName : System.getProperties().stringPropertyNames()) {
+ Configuration conf = job.getConfiguration();
+ if (propName.startsWith("hadoop.")) {
+ conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
+ }
+ }
+
+ job.setInputFormatClass(TextInputFormat.class);
+
+ job.setMapperClass(IndexGeneratorMapper.class);
+ job.setMapOutputValueClass(Text.class);
+
+ SortableBytes.useSortableBytesAsMapOutputKey(job);
+
+ job.setNumReduceTasks(Iterables.size(config.getAllBuckets()));
+ job.setPartitionerClass(IndexGeneratorPartitioner.class);
+
+ job.setReducerClass(IndexGeneratorReducer.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
+
+ config.addInputPaths(job);
+ config.intoConfiguration(job);
+
+ JobHelper.setupClasspath(config, job);
+
+ job.submit();
+ log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
+
+ boolean success = job.waitForCompletion(true);
+
+ Counter invalidRowCount = job.getCounters()
+ .findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
+ jobStats.setInvalidRowCount(invalidRowCount.getValue());
+
+ return success;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public static List getPublishedSegments(HadoopDruidIndexerConfig config)
{
@@ -129,60 +189,6 @@ public class IndexGeneratorJob implements Jobby
return publishedSegments;
}
- public IndexGeneratorStats getJobStats()
- {
- return jobStats;
- }
-
- public boolean run()
- {
- try {
- Job job = new Job(
- new Configuration(),
- String.format("%s-index-generator-%s", config.getDataSource(), config.getIntervals())
- );
-
- job.getConfiguration().set("io.sort.record.percent", "0.23");
-
- JobHelper.injectSystemProperties(job);
-
- job.setInputFormatClass(TextInputFormat.class);
-
- job.setMapperClass(IndexGeneratorMapper.class);
- job.setMapOutputValueClass(Text.class);
-
- SortableBytes.useSortableBytesAsMapOutputKey(job);
-
- job.setNumReduceTasks(Iterables.size(config.getAllBuckets().get()));
- job.setPartitionerClass(IndexGeneratorPartitioner.class);
-
- job.setReducerClass(IndexGeneratorReducer.class);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(Text.class);
- job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
- FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
-
- config.addInputPaths(job);
- config.intoConfiguration(job);
-
- JobHelper.setupClasspath(config, job);
-
- job.submit();
- log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
-
- boolean success = job.waitForCompletion(true);
-
- Counter invalidRowCount = job.getCounters()
- .findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
- jobStats.setInvalidRowCount(invalidRowCount.getValue());
-
- return success;
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper
{
@@ -210,9 +216,8 @@ public class IndexGeneratorJob implements Jobby
}
}
- public static class IndexGeneratorPartitioner extends Partitioner implements Configurable
+ public static class IndexGeneratorPartitioner extends Partitioner
{
- private Configuration config;
@Override
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
@@ -220,27 +225,12 @@ public class IndexGeneratorJob implements Jobby
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
bytes.position(4); // Skip length added by SortableBytes
int shardNum = bytes.getInt();
- if (config.get("mapred.job.tracker").equals("local")) {
- return shardNum % numPartitions;
- } else {
- if (shardNum >= numPartitions) {
- throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions);
- }
- return shardNum;
+ if (shardNum >= numPartitions) {
+ throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions);
}
- }
- @Override
- public Configuration getConf()
- {
- return config;
- }
-
- @Override
- public void setConf(Configuration config)
- {
- this.config = config;
+ return shardNum;
}
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
index 08cb3a25bfd..654f70b5b4d 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
@@ -20,11 +20,9 @@
package io.druid.indexer;
import com.google.api.client.util.Sets;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
-import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -36,7 +34,6 @@ import org.apache.hadoop.mapreduce.Job;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.List;
import java.util.Set;
/**
@@ -97,63 +94,4 @@ public class JobHelper
}
}
}
-
- public static void injectSystemProperties(Job job)
- {
- final Configuration conf = job.getConfiguration();
- for (String propName : System.getProperties().stringPropertyNames()) {
- if (propName.startsWith("hadoop.")) {
- conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
- }
- }
- }
-
- public static void ensurePaths(HadoopDruidIndexerConfig config)
- {
- // config.addInputPaths() can have side-effects ( boo! :( ), so this stuff needs to be done before anything else
- try {
- Job job = new Job(
- new Configuration(),
- String.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals())
- );
-
- job.getConfiguration().set("io.sort.record.percent", "0.19");
- injectSystemProperties(job);
-
- config.addInputPaths(job);
- }
- catch (IOException e) {
- throw Throwables.propagate(e);
- }
- }
-
- public static boolean runJobs(List jobs, HadoopDruidIndexerConfig config){
- String failedMessage = null;
- for (Jobby job : jobs) {
- if (failedMessage == null) {
- if (!job.run()) {
- failedMessage = String.format("Job[%s] failed!", job.getClass());
- }
- }
- }
-
- if (!config.isLeaveIntermediate()) {
- if (failedMessage == null || config.isCleanupOnFailure()) {
- Path workingPath = config.makeIntermediatePath();
- log.info("Deleting path[%s]", workingPath);
- try {
- workingPath.getFileSystem(new Configuration()).delete(workingPath, true);
- }
- catch (IOException e) {
- log.error(e, "Failed to cleanup path[%s]", workingPath);
- }
- }
- }
-
- if (failedMessage != null) {
- throw new ISE(failedMessage);
- }
-
- return true;
- }
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/ArbitraryGranularitySpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/ArbitraryGranularitySpec.java
index c78185092d2..b64ffcc1bf9 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/ArbitraryGranularitySpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/ArbitraryGranularitySpec.java
@@ -72,9 +72,9 @@ public class ArbitraryGranularitySpec implements GranularitySpec
@Override
@JsonProperty("intervals")
- public Optional> bucketIntervals()
+ public SortedSet bucketIntervals()
{
- return Optional.of((SortedSet) intervals);
+ return intervals;
}
@Override
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/GranularitySpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/GranularitySpec.java
index 0b36c803d93..5e7d0d7ab06 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/GranularitySpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/GranularitySpec.java
@@ -40,7 +40,7 @@ import java.util.SortedSet;
public interface GranularitySpec
{
/** Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.*/
- public Optional> bucketIntervals();
+ public SortedSet bucketIntervals();
/** Time-grouping interval corresponding to some instant, if any. */
public Optional bucketInterval(DateTime dt);
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/UniformGranularitySpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/UniformGranularitySpec.java
index a39a9bce0f4..726426c2a14 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/UniformGranularitySpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/UniformGranularitySpec.java
@@ -35,7 +35,7 @@ import java.util.SortedSet;
public class UniformGranularitySpec implements GranularitySpec
{
final private Granularity granularity;
- final private Iterable inputIntervals;
+ final private List inputIntervals;
final private ArbitraryGranularitySpec wrappedSpec;
@JsonCreator
@@ -44,28 +44,21 @@ public class UniformGranularitySpec implements GranularitySpec
@JsonProperty("intervals") List inputIntervals
)
{
- this.granularity = granularity;
- if (inputIntervals != null) {
- List granularIntervals = Lists.newArrayList();
- for (Interval inputInterval : inputIntervals) {
- Iterables.addAll(granularIntervals, granularity.getIterable(inputInterval));
- }
- this.inputIntervals = ImmutableList.copyOf(inputIntervals);
- this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals);
- } else {
- this.inputIntervals = null;
- this.wrappedSpec = null;
+ List granularIntervals = Lists.newArrayList();
+
+ for (Interval inputInterval : inputIntervals) {
+ Iterables.addAll(granularIntervals, granularity.getIterable(inputInterval));
}
+
+ this.granularity = granularity;
+ this.inputIntervals = ImmutableList.copyOf(inputIntervals);
+ this.wrappedSpec = new ArbitraryGranularitySpec(granularIntervals);
}
@Override
- public Optional> bucketIntervals()
+ public SortedSet bucketIntervals()
{
- if (wrappedSpec == null) {
- return Optional.absent();
- } else {
- return wrappedSpec.bucketIntervals();
- }
+ return wrappedSpec.bucketIntervals();
}
@Override
@@ -82,8 +75,8 @@ public class UniformGranularitySpec implements GranularitySpec
}
@JsonProperty("intervals")
- public Optional> getIntervals()
+ public Iterable getIntervals()
{
- return Optional.fromNullable(inputIntervals);
+ return inputIntervals;
}
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java
deleted file mode 100644
index 90fab3e0435..00000000000
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/AbstractPartitionsSpec.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.indexer.partitions;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-
-public abstract class AbstractPartitionsSpec implements PartitionsSpec
-{
- private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
- private final long targetPartitionSize;
- private final long maxPartitionSize;
- private final boolean assumeGrouped;
-
- public AbstractPartitionsSpec(
- Long targetPartitionSize,
- Long maxPartitionSize,
- Boolean assumeGrouped
- )
- {
- this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
- this.maxPartitionSize = maxPartitionSize == null
- ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
- : maxPartitionSize;
- this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
- }
-
- @JsonProperty
- public long getTargetPartitionSize()
- {
- return targetPartitionSize;
- }
-
- @JsonProperty
- public long getMaxPartitionSize()
- {
- return maxPartitionSize;
- }
-
- @JsonProperty
- public boolean isAssumeGrouped()
- {
- return assumeGrouped;
- }
-
- @Override
- public boolean isDeterminingPartitions()
- {
- return targetPartitionSize > 0;
- }
-}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java
index 2186c584879..bf38042cf0d 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/PartitionsSpec.java
@@ -19,33 +19,69 @@
package io.druid.indexer.partitions;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import io.druid.indexer.HadoopDruidIndexerConfig;
-import io.druid.indexer.Jobby;
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleDimensionPartitionsSpec.class)
-@JsonSubTypes(value = {
- @JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class),
- @JsonSubTypes.Type(name = "random", value = RandomPartitionsSpec.class)
-})
-public interface PartitionsSpec
+import javax.annotation.Nullable;
+
+public class PartitionsSpec
{
- @JsonIgnore
- public Jobby getPartitionJob(HadoopDruidIndexerConfig config);
+ private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
- @JsonProperty
- public long getTargetPartitionSize();
+ @Nullable
+ private final String partitionDimension;
- @JsonProperty
- public long getMaxPartitionSize();
+ private final long targetPartitionSize;
- @JsonProperty
- public boolean isAssumeGrouped();
+ private final long maxPartitionSize;
+
+ private final boolean assumeGrouped;
+
+ @JsonCreator
+ public PartitionsSpec(
+ @JsonProperty("partitionDimension") @Nullable String partitionDimension,
+ @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
+ @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
+ @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
+ )
+ {
+ this.partitionDimension = partitionDimension;
+ this.targetPartitionSize = targetPartitionSize == null ? -1 : targetPartitionSize;
+ this.maxPartitionSize = maxPartitionSize == null
+ ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
+ : maxPartitionSize;
+ this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
+ }
@JsonIgnore
- public boolean isDeterminingPartitions();
+ public boolean isDeterminingPartitions()
+ {
+ return targetPartitionSize > 0;
+ }
+ @JsonProperty
+ @Nullable
+ public String getPartitionDimension()
+ {
+ return partitionDimension;
+ }
+
+ @JsonProperty
+ public long getTargetPartitionSize()
+ {
+ return targetPartitionSize;
+ }
+
+ @JsonProperty
+ public long getMaxPartitionSize()
+ {
+ return maxPartitionSize;
+ }
+
+ @JsonProperty
+ public boolean isAssumeGrouped()
+ {
+ return assumeGrouped;
+ }
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java
deleted file mode 100644
index 7b13a1bf663..00000000000
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/RandomPartitionsSpec.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.indexer.partitions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import io.druid.indexer.DetermineHashedPartitionsJob;
-import io.druid.indexer.HadoopDruidIndexerConfig;
-import io.druid.indexer.Jobby;
-
-import javax.annotation.Nullable;
-
-public class RandomPartitionsSpec extends AbstractPartitionsSpec
-{
- @JsonCreator
- public RandomPartitionsSpec(
- @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
- @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
- @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
- )
- {
- super(targetPartitionSize, maxPartitionSize, assumeGrouped);
- }
-
- @Override
- public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
- {
- return new DetermineHashedPartitionsJob(config);
- }
-}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
deleted file mode 100644
index 118d1355914..00000000000
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.indexer.partitions;
-
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import io.druid.indexer.DeterminePartitionsJob;
-import io.druid.indexer.HadoopDruidIndexerConfig;
-import io.druid.indexer.Jobby;
-
-import javax.annotation.Nullable;
-
-public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec
-{
- @Nullable
- private final String partitionDimension;
-
- @JsonCreator
- public SingleDimensionPartitionsSpec(
- @JsonProperty("partitionDimension") @Nullable String partitionDimension,
- @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
- @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
- @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
- )
- {
- super(targetPartitionSize, maxPartitionSize, assumeGrouped);
- this.partitionDimension = partitionDimension;
- }
-
- @JsonProperty
- @Nullable
- public String getPartitionDimension()
- {
- return partitionDimension;
- }
-
- @Override
- public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
- {
- return new DeterminePartitionsJob(config);
- }
-}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java
index 005d2934e3c..adc00c77c49 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java
@@ -20,7 +20,6 @@
package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Comparators;
@@ -100,12 +99,9 @@ public class GranularityPathSpec implements PathSpec
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException
{
final Set intervals = Sets.newTreeSet(Comparators.intervals());
- Optional> optionalIntervals = config.getSegmentGranularIntervals();
- if (optionalIntervals.isPresent()) {
- for (Interval segmentInterval : optionalIntervals.get()) {
- for (Interval dataInterval : dataGranularity.getIterable(segmentInterval)) {
- intervals.add(dataInterval);
- }
+ for (Interval segmentInterval : config.getSegmentGranularIntervals()) {
+ for (Interval dataInterval : dataGranularity.getIterable(segmentInterval)) {
+ intervals.add(dataInterval);
}
}
diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
index f6c8a1f66d0..b36f2822d26 100644
--- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
+++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java
@@ -25,8 +25,6 @@ import com.google.common.collect.Lists;
import io.druid.db.DbConnectorConfig;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
-import io.druid.indexer.partitions.RandomPartitionsSpec;
-import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -67,7 +65,7 @@ public class HadoopDruidIndexerConfigTest
Assert.assertEquals(
"getIntervals",
Lists.newArrayList(new Interval("2012-01-01/P1D")),
- granularitySpec.getIntervals().get()
+ granularitySpec.getIntervals()
);
Assert.assertEquals(
@@ -101,7 +99,7 @@ public class HadoopDruidIndexerConfigTest
Assert.assertEquals(
"getIntervals",
Lists.newArrayList(new Interval("2012-02-01/P1D")),
- granularitySpec.getIntervals().get()
+ granularitySpec.getIntervals()
);
Assert.assertEquals(
@@ -169,14 +167,15 @@ public class HadoopDruidIndexerConfigTest
100
);
- Assert.assertTrue(
- "partitionSpec",
- partitionsSpec instanceof SingleDimensionPartitionsSpec
+ Assert.assertEquals(
+ "getPartitionDimension",
+ partitionsSpec.getPartitionDimension(),
+ null
);
}
@Test
- public void testPartitionsSpecSpecificDimensionLegacy()
+ public void testPartitionsSpecSpecificDimension()
{
final HadoopDruidIndexerConfig cfg;
@@ -215,10 +214,9 @@ public class HadoopDruidIndexerConfigTest
150
);
- Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
- ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
+ partitionsSpec.getPartitionDimension(),
"foo"
);
}
@@ -261,10 +259,9 @@ public class HadoopDruidIndexerConfigTest
150
);
- Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
- ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
+ partitionsSpec.getPartitionDimension(),
"foo"
);
}
@@ -310,10 +307,9 @@ public class HadoopDruidIndexerConfigTest
200
);
- Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof SingleDimensionPartitionsSpec);
Assert.assertEquals(
"getPartitionDimension",
- ((SingleDimensionPartitionsSpec)partitionsSpec).getPartitionDimension(),
+ partitionsSpec.getPartitionDimension(),
"foo"
);
}
@@ -424,6 +420,7 @@ public class HadoopDruidIndexerConfigTest
);
}
+
@Test
public void shouldMakeHDFSCompliantSegmentOutputPath()
{
@@ -500,47 +497,4 @@ public class HadoopDruidIndexerConfigTest
throw Throwables.propagate(e);
}
}
-
- public void testRandomPartitionsSpec() throws Exception{
- {
- final HadoopDruidIndexerConfig cfg;
-
- try {
- cfg = jsonReadWriteRead(
- "{"
- + "\"partitionsSpec\":{"
- + " \"targetPartitionSize\":100,"
- + " \"type\":\"random\""
- + " }"
- + "}",
- HadoopDruidIndexerConfig.class
- );
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
-
- final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
-
- Assert.assertEquals(
- "isDeterminingPartitions",
- partitionsSpec.isDeterminingPartitions(),
- true
- );
-
- Assert.assertEquals(
- "getTargetPartitionSize",
- partitionsSpec.getTargetPartitionSize(),
- 100
- );
-
- Assert.assertEquals(
- "getMaxPartitionSize",
- partitionsSpec.getMaxPartitionSize(),
- 150
- );
-
- Assert.assertTrue("partitionsSpec" , partitionsSpec instanceof RandomPartitionsSpec);
- }
- }
}
diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/granularity/ArbitraryGranularityTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/granularity/ArbitraryGranularityTest.java
index d934cb963ed..18e8f27a824 100644
--- a/indexing-hadoop/src/test/java/io/druid/indexer/granularity/ArbitraryGranularityTest.java
+++ b/indexing-hadoop/src/test/java/io/druid/indexer/granularity/ArbitraryGranularityTest.java
@@ -54,7 +54,7 @@ public class ArbitraryGranularityTest
new Interval("2012-01-08T00Z/2012-01-11T00Z"),
new Interval("2012-02-01T00Z/2012-03-01T00Z")
),
- Lists.newArrayList(spec.bucketIntervals().get())
+ Lists.newArrayList(spec.bucketIntervals())
);
Assert.assertEquals(
diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/granularity/UniformGranularityTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/granularity/UniformGranularityTest.java
index 31c9cdb3e80..72ce24f7d47 100644
--- a/indexing-hadoop/src/test/java/io/druid/indexer/granularity/UniformGranularityTest.java
+++ b/indexing-hadoop/src/test/java/io/druid/indexer/granularity/UniformGranularityTest.java
@@ -57,7 +57,7 @@ public class UniformGranularityTest
new Interval("2012-01-09T00Z/P1D"),
new Interval("2012-01-10T00Z/P1D")
),
- Lists.newArrayList(spec.bucketIntervals().get())
+ Lists.newArrayList(spec.bucketIntervals())
);
Assert.assertEquals(
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 30d0750f3d0..233714f5c71 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -24,23 +24,25 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.util.Lists;
+import com.google.common.base.Function;
import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
-import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigBuilder;
import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopDruidIndexerSchema;
-import io.druid.indexer.Jobby;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
-import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction;
+import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.initialization.Initialization;
import io.druid.server.initialization.ExtensionsConfig;
@@ -49,26 +51,30 @@ import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
-import java.util.SortedSet;
+import java.util.Map;
-public class HadoopIndexTask extends AbstractTask
+public class HadoopIndexTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(HadoopIndexTask.class);
+ private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
+
private static final ExtensionsConfig extensionsConfig;
static {
extensionsConfig = Initialization.makeStartupInjector().getInstance(ExtensionsConfig.class);
}
- private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
@JsonIgnore
private final HadoopDruidIndexerSchema schema;
+
@JsonIgnore
private final String hadoopCoordinates;
@@ -91,7 +97,13 @@ public class HadoopIndexTask extends AbstractTask
{
super(
id != null ? id : String.format("index_hadoop_%s_%s", schema.getDataSource(), new DateTime()),
- schema.getDataSource()
+ schema.getDataSource(),
+ JodaUtils.umbrellaInterval(
+ JodaUtils.condenseIntervals(
+ schema.getGranularitySpec()
+ .bucketIntervals()
+ )
+ )
);
// Some HadoopDruidIndexerSchema stuff doesn't make sense in the context of the indexing service
@@ -109,22 +121,6 @@ public class HadoopIndexTask extends AbstractTask
return "index_hadoop";
}
- @Override
- public boolean isReady(TaskActionClient taskActionClient) throws Exception
- {
- Optional> intervals = schema.getGranularitySpec().bucketIntervals();
- if (intervals.isPresent()) {
- Interval interval = JodaUtils.umbrellaInterval(
- JodaUtils.condenseIntervals(
- intervals.get()
- )
- );
- return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent();
- } else {
- return true;
- }
- }
-
@JsonProperty("config")
public HadoopDruidIndexerSchema getSchema()
{
@@ -171,60 +167,29 @@ public class HadoopIndexTask extends AbstractTask
jobUrls.addAll(extensionURLs);
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(jobUrls));
- boolean determineIntervals = !schema.getGranularitySpec().bucketIntervals().isPresent();
- final Class> determineConfigurationMainClass = loader.loadClass(HadoopDetermineConfigInnerProcessing.class.getName());
- final Method determineConfigurationMainMethod = determineConfigurationMainClass.getMethod(
- "runTask",
- String[].class
- );
+ final Class> mainClass = loader.loadClass(HadoopIndexTaskInnerProcessing.class.getName());
+ final Method mainMethod = mainClass.getMethod("runTask", String[].class);
- String[] determineConfigArgs = new String[]{
+ // We should have a lock from before we started running
+ final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+ log.info("Setting version to: %s", myLock.getVersion());
+
+ String[] args = new String[]{
toolbox.getObjectMapper().writeValueAsString(schema),
+ myLock.getVersion(),
toolbox.getConfig().getHadoopWorkingPath(),
- toolbox.getSegmentPusher().getPathForHadoop(getDataSource())
+ toolbox.getSegmentPusher().getPathForHadoop(getDataSource()),
};
- String config = (String) determineConfigurationMainMethod.invoke(null, new Object[]{determineConfigArgs});
- HadoopDruidIndexerSchema indexerSchema = toolbox.getObjectMapper()
- .readValue(config, HadoopDruidIndexerSchema.class);
-
-
- // We should have a lock from before we started running only if interval was specified
- final String version;
- if (determineIntervals) {
- Interval interval = JodaUtils.umbrellaInterval(
- JodaUtils.condenseIntervals(
- indexerSchema.getGranularitySpec().bucketIntervals().get()
- )
- );
- TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval));
- version = lock.getVersion();
- } else {
- Iterable locks = getTaskLocks(toolbox);
- final TaskLock myLock = Iterables.getOnlyElement(locks);
- version = myLock.getVersion();
- }
- log.info("Setting version to: %s", version);
-
- final Class> indexGeneratorMainClass = loader.loadClass(HadoopIndexGeneratorInnerProcessing.class.getName());
- final Method indexGeneratorMainMethod = indexGeneratorMainClass.getMethod("runTask", String[].class);
- String[] indexGeneratorArgs = new String[]{
- toolbox.getObjectMapper().writeValueAsString(indexerSchema),
- version
- };
- String segments = (String) indexGeneratorMainMethod.invoke(null, new Object[]{indexGeneratorArgs});
+ String segments = (String) mainMethod.invoke(null, new Object[]{args});
if (segments != null) {
-
List publishedSegments = toolbox.getObjectMapper().readValue(
segments,
- new TypeReference>()
- {
- }
+ new TypeReference>() {}
);
-
toolbox.pushSegments(publishedSegments);
return TaskStatus.success(getId());
} else {
@@ -232,12 +197,14 @@ public class HadoopIndexTask extends AbstractTask
}
}
- public static class HadoopIndexGeneratorInnerProcessing
+ public static class HadoopIndexTaskInnerProcessing
{
public static String runTask(String[] args) throws Exception
{
final String schema = args[0];
- String version = args[1];
+ final String version = args[1];
+ final String workingPath = args[2];
+ final String segmentOutputPath = args[3];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue(
@@ -247,6 +214,12 @@ public class HadoopIndexTask extends AbstractTask
final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withVersion(version)
+ .withWorkingPath(
+ workingPath
+ )
+ .withSegmentOutputPath(
+ segmentOutputPath
+ )
.build();
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config);
@@ -259,34 +232,4 @@ public class HadoopIndexTask extends AbstractTask
return null;
}
}
-
- public static class HadoopDetermineConfigInnerProcessing
- {
- public static String runTask(String[] args) throws Exception
- {
- final String schema = args[0];
- final String workingPath = args[1];
- final String segmentOutputPath = args[2];
-
- final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
- .readValue(
- schema,
- HadoopDruidIndexerSchema.class
- );
- final HadoopDruidIndexerConfig config =
- new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
- .withWorkingPath(workingPath)
- .withSegmentOutputPath(segmentOutputPath)
- .build();
-
- Jobby job = new HadoopDruidDetermineConfigurationJob(config);
-
- log.info("Starting a hadoop index generator job...");
- if (job.run()) {
- return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(HadoopDruidIndexerConfigBuilder.toSchema(config));
- }
-
- return null;
- }
- }
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index fdbaddfa2f1..c313b95b9df 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -109,8 +109,8 @@ public class IndexTask extends AbstractFixedIntervalTask
id != null ? id : String.format("index_%s_%s", dataSource, new DateTime().toString()),
dataSource,
new Interval(
- granularitySpec.bucketIntervals().get().first().getStart(),
- granularitySpec.bucketIntervals().get().last().getEnd()
+ granularitySpec.bucketIntervals().first().getStart(),
+ granularitySpec.bucketIntervals().last().getEnd()
)
);
@@ -137,7 +137,7 @@ public class IndexTask extends AbstractFixedIntervalTask
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Set segments = Sets.newHashSet();
- final Set validIntervals = Sets.intersection(granularitySpec.bucketIntervals().get(), getDataIntervals());
+ final Set validIntervals = Sets.intersection(granularitySpec.bucketIntervals(), getDataIntervals());
if (validIntervals.isEmpty()) {
throw new ISE("No valid data intervals found. Check your configs!");
}
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
index d8c62bbce1f..e9ace7ac18a 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
@@ -382,9 +382,11 @@ public class TaskSerdeTest
final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
+ Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
+ Assert.assertEquals(task.getInterval(), task2.getInterval());
}
}
diff --git a/server/src/main/java/io/druid/guice/ServerModule.java b/server/src/main/java/io/druid/guice/ServerModule.java
index d0ead89825d..3417ef3d82e 100644
--- a/server/src/main/java/io/druid/guice/ServerModule.java
+++ b/server/src/main/java/io/druid/guice/ServerModule.java
@@ -30,7 +30,6 @@ import io.druid.guice.annotations.Self;
import io.druid.initialization.DruidModule;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ZkPathsConfig;
-import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
@@ -64,8 +63,7 @@ public class ServerModule implements DruidModule
.registerSubtypes(
new NamedType(SingleDimensionShardSpec.class, "single"),
new NamedType(LinearShardSpec.class, "linear"),
- new NamedType(NumberedShardSpec.class, "numbered"),
- new NamedType(HashBasedNumberedShardSpec.class, "hashed")
+ new NamedType(NumberedShardSpec.class, "numbered")
)
);
}
diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java
deleted file mode 100644
index 5f9635cd650..00000000000
--- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2014 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.timeline.partition;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.client.repackaged.com.google.common.base.Throwables;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import io.druid.data.input.InputRow;
-import io.druid.data.input.Rows;
-
-import java.util.List;
-
-public class HashBasedNumberedShardSpec extends NumberedShardSpec
-{
-
- private static final HashFunction hashFunction = Hashing.murmur3_32();
- @JacksonInject
- private ObjectMapper jsonMapper;
-
- @JsonCreator
- public HashBasedNumberedShardSpec(
- @JsonProperty("partitionNum") int partitionNum,
- @JsonProperty("partitions") int partitions
- )
- {
- super(partitionNum, partitions);
- }
-
- @Override
- public boolean isInChunk(InputRow inputRow)
- {
- return Math.abs(hash(inputRow)) % getPartitions() == getPartitionNum();
- }
-
- private int hash(InputRow inputRow)
- {
- final List groupKey = Rows.toGroupKey(inputRow.getTimestampFromEpoch(), inputRow);
- try {
- return hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asInt();
- }
- catch (JsonProcessingException e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public String toString()
- {
- return "HashBasedNumberedShardSpec{" +
- "partitionNum=" + getPartitionNum() +
- ", partitions=" + getPartitions() +
- '}';
- }
-
-}
\ No newline at end of file
diff --git a/server/src/test/java/io/druid/TestUtil.java b/server/src/test/java/io/druid/TestUtil.java
index 9f47f2183fb..e184c77998f 100644
--- a/server/src/test/java/io/druid/TestUtil.java
+++ b/server/src/test/java/io/druid/TestUtil.java
@@ -19,12 +19,8 @@
package io.druid;
-import com.fasterxml.jackson.databind.BeanProperty;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.metamx.common.ISE;
import io.druid.guice.ServerModule;
import io.druid.jackson.DefaultObjectMapper;
@@ -41,20 +37,5 @@ public class TestUtil
for (Module module : list) {
MAPPER.registerModule(module);
}
- MAPPER.setInjectableValues(
- new InjectableValues()
- {
- @Override
- public Object findInjectableValue(
- Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
- )
- {
- if (valueId.equals("com.fasterxml.jackson.databind.ObjectMapper")) {
- return TestUtil.MAPPER;
- }
- throw new ISE("No Injectable value found");
- }
- }
- );
}
}
diff --git a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java
deleted file mode 100644
index 9b30768ca40..00000000000
--- a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.server.shard;
-
-import com.fasterxml.jackson.databind.BeanProperty;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.InjectableValues;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.metamx.common.ISE;
-import io.druid.TestUtil;
-import io.druid.timeline.partition.HashBasedNumberedShardSpec;
-import io.druid.timeline.partition.PartitionChunk;
-import io.druid.timeline.partition.ShardSpec;
-import junit.framework.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.List;
-
-public class HashBasedNumberedShardSpecTest
-{
-
- @Test
- public void testSerdeRoundTrip() throws Exception
- {
-
- final ShardSpec spec = TestUtil.MAPPER.readValue(
- TestUtil.MAPPER.writeValueAsBytes(new HashBasedNumberedShardSpec(1, 2)),
- ShardSpec.class
- );
- Assert.assertEquals(1, spec.getPartitionNum());
- Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getPartitions());
- }
-
- @Test
- public void testSerdeBackwardsCompat() throws Exception
- {
- final ShardSpec spec = TestUtil.MAPPER.readValue(
- "{\"type\": \"hashed\", \"partitions\": 2, \"partitionNum\": 1}",
- ShardSpec.class
- );
- Assert.assertEquals(1, spec.getPartitionNum());
- Assert.assertEquals(2, ((HashBasedNumberedShardSpec) spec).getPartitions());
- }
-
- @Test
- public void testPartitionChunks()
- {
- final List specs = ImmutableList.of(
- new HashBasedNumberedShardSpec(0, 3),
- new HashBasedNumberedShardSpec(1, 3),
- new HashBasedNumberedShardSpec(2, 3)
- );
-
- final List> chunks = Lists.transform(
- specs,
- new Function>()
- {
- @Override
- public PartitionChunk apply(ShardSpec shardSpec)
- {
- return shardSpec.createChunk("rofl");
- }
- }
- );
-
- Assert.assertEquals(0, chunks.get(0).getChunkNumber());
- Assert.assertEquals(1, chunks.get(1).getChunkNumber());
- Assert.assertEquals(2, chunks.get(2).getChunkNumber());
-
- Assert.assertTrue(chunks.get(0).isStart());
- Assert.assertFalse(chunks.get(1).isStart());
- Assert.assertFalse(chunks.get(2).isStart());
-
- Assert.assertFalse(chunks.get(0).isEnd());
- Assert.assertFalse(chunks.get(1).isEnd());
- Assert.assertTrue(chunks.get(2).isEnd());
-
- Assert.assertTrue(chunks.get(0).abuts(chunks.get(1)));
- Assert.assertTrue(chunks.get(1).abuts(chunks.get(2)));
-
- Assert.assertFalse(chunks.get(0).abuts(chunks.get(0)));
- Assert.assertFalse(chunks.get(0).abuts(chunks.get(2)));
- Assert.assertFalse(chunks.get(1).abuts(chunks.get(0)));
- Assert.assertFalse(chunks.get(1).abuts(chunks.get(1)));
- Assert.assertFalse(chunks.get(2).abuts(chunks.get(0)));
- Assert.assertFalse(chunks.get(2).abuts(chunks.get(1)));
- Assert.assertFalse(chunks.get(2).abuts(chunks.get(2)));
- }
-
-}