diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index ee580ac23b7..82643065123 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -158,22 +158,27 @@ public class DetermineHashedPartitionsJob implements Jobby fileSystem = partitionInfoPath.getFileSystem(groupByJob.getConfiguration()); } if (Utils.exists(groupByJob, fileSystem, partitionInfoPath)) { - Long cardinality = config.jsonMapper.readValue( - Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference() - { - } - ); - final int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize()); - - List actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards); - if (numberOfShards == 1) { - actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); - } else { - for (int i = 0; i < numberOfShards; ++i) { - actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); - log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); + final Long numRows = config.jsonMapper.readValue( + Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference() + { + } + ); + + log.info("Found approximately [%,d] rows in data.", numRows); + + final int numberOfShards = (int) Math.ceil((double) numRows / config.getTargetPartitionSize()); + + log.info("Creating [%,d] shards", numberOfShards); + + List actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards); + if (numberOfShards == 1) { + actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); + } else { + for (int i = 0; i < numberOfShards; ++i) { + actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); + log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); + } } - } shardSpecs.put(bucket, actualSpecs); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 853e0708fab..5956287f5d0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -37,6 +37,7 @@ import com.google.inject.Module; import com.metamx.common.ISE; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; +import com.metamx.common.parsers.TimestampParser; import io.druid.common.utils.JodaUtils; import io.druid.data.input.InputRow; import io.druid.data.input.impl.StringInputRowParser; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index f406c747a70..32565f1d51a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -34,6 +35,7 @@ import com.google.common.primitives.Ints; import com.metamx.common.ISE; import com.metamx.common.guava.Comparators; import com.metamx.common.logger.Logger; +import com.metamx.common.parsers.TimestampParser; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; @@ -551,4 +553,12 @@ public class IndexTask extends AbstractFixedIntervalTask return rowFlushBoundary; } } + + + public static void main(String[] args) + { + Function parser = TimestampParser.createTimestampParser("millis"); + parser.apply("1401266370985"); + + } }