diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsUsingCardinalityJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java similarity index 91% rename from indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsUsingCardinalityJob.java rename to indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index bde1dfb4955..3b824d70794 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsUsingCardinalityJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -62,31 +62,22 @@ import java.util.Map; import java.util.Set; /** - * Determines appropriate ShardSpecs for a job by determining approximate cardinality of data set + * Determines appropriate ShardSpecs for a job by determining approximate cardinality of data set using HyperLogLog */ -public class DeterminePartitionsUsingCardinalityJob implements Jobby +public class DetermineHashedPartitionsJob implements Jobby { private static final int MAX_SHARDS = 128; - private static final Logger log = new Logger(DeterminePartitionsUsingCardinalityJob.class); + private static final Logger log = new Logger(DetermineHashedPartitionsJob.class); private final HadoopDruidIndexerConfig config; + private static final int HYPER_LOG_LOG_BIT_SIZE = 20; - public DeterminePartitionsUsingCardinalityJob( + public DetermineHashedPartitionsJob( HadoopDruidIndexerConfig config ) { this.config = config; } - public static void injectSystemProperties(Job job) - { - final Configuration conf = job.getConfiguration(); - for (String propName : System.getProperties().stringPropertyNames()) { - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } - } - } - public boolean run() { try { @@ -97,10 +88,10 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby long startTime = System.currentTimeMillis(); final Job groupByJob = new Job( new Configuration(), - String.format("%s-determine_cardinality_grouped-%s", config.getDataSource(), config.getIntervals()) + String.format("%s-determine_partitions_hashed-%s", config.getDataSource(), config.getIntervals()) ); - injectSystemProperties(groupByJob); + JobHelper.injectSystemProperties(groupByJob); groupByJob.setInputFormatClass(TextInputFormat.class); groupByJob.setMapperClass(DetermineCardinalityMapper.class); groupByJob.setMapOutputKeyClass(LongWritable.class); @@ -145,6 +136,7 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby log.info("Determined Intervals for Job [%s]" + config.getSegmentGranularIntervals()); } Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); + int shardCount = 0; for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { DateTime bucket = segmentGranularity.getStart(); @@ -161,14 +153,17 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby int numberOfShards = (int) Math.ceil((double) cardinality / config.getTargetPartitionSize()); if (numberOfShards > MAX_SHARDS) { - numberOfShards = MAX_SHARDS; + throw new ISE( + "Number of shards [%d] exceed the maximum limit of [%d], either targetPartitionSize is too low or data volume is too high", + numberOfShards, + MAX_SHARDS + ); } List actualSpecs = Lists.newArrayListWithExpectedSize(numberOfShards); if (numberOfShards == 1) { - actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), 0)); + actualSpecs.add(new HadoopyShardSpec(new NoneShardSpec(), shardCount++)); } else { - int shardCount = 0; for (int i = 0; i < numberOfShards; ++i) { actualSpecs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, numberOfShards), shardCount++)); log.info("DateTime[%s], partition[%d], spec[%s]", bucket, i, actualSpecs.get(i)); @@ -183,9 +178,8 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby } config.setShardSpecs(shardSpecs); log.info( - "Determine partitions Using cardinality took %d millis shardSpecs %s", - (System.currentTimeMillis() - startTime), - shardSpecs + "DetermineHashedPartitionsJob took %d millis", + (System.currentTimeMillis() - startTime) ); return true; @@ -218,7 +212,7 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby determineIntervals = false; final ImmutableMap.Builder builder = ImmutableMap.builder(); for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals()) { - builder.put(bucketInterval, new HyperLogLog(20)); + builder.put(bucketInterval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE)); } hyperLogLogs = builder.build(); } @@ -249,7 +243,7 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby interval = config.getGranularitySpec().getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch())); if (!hyperLogLogs.containsKey(interval)) { - hyperLogLogs.put(interval, new HyperLogLog(20)); + hyperLogLogs.put(interval, new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE)); } } else { final Optional maybeInterval = config.getGranularitySpec() @@ -307,7 +301,7 @@ public class DeterminePartitionsUsingCardinalityJob implements Jobby Context context ) throws IOException, InterruptedException { - HyperLogLog aggregate = new HyperLogLog(20); + HyperLogLog aggregate = new HyperLogLog(HYPER_LOG_LOG_BIT_SIZE); for (BytesWritable value : values) { HyperLogLog logValue = HyperLogLog.Builder.build(value.getBytes()); try { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index 4af000ed859..38486c6a16a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -107,16 +107,6 @@ public class DeterminePartitionsJob implements Jobby this.config = config; } - public static void injectSystemProperties(Job job) - { - final Configuration conf = job.getConfiguration(); - for (String propName : System.getProperties().stringPropertyNames()) { - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } - } - } - public boolean run() { try { @@ -131,7 +121,7 @@ public class DeterminePartitionsJob implements Jobby String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals()) ); - injectSystemProperties(groupByJob); + JobHelper.injectSystemProperties(groupByJob); groupByJob.setInputFormatClass(TextInputFormat.class); groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class); groupByJob.setMapOutputKeyClass(BytesWritable.class); @@ -168,7 +158,7 @@ public class DeterminePartitionsJob implements Jobby dimSelectionJob.getConfiguration().set("io.sort.record.percent", "0.19"); - injectSystemProperties(dimSelectionJob); + JobHelper.injectSystemProperties(dimSelectionJob); if (!config.getPartitionsSpec().isAssumeGrouped()) { // Read grouped data from the groupByJob. diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java index a471e09acf9..53160cb4f1d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java @@ -74,7 +74,7 @@ public class HadoopDruidIndexerJob implements Jobby if (config.isDeterminingPartitions()) { if(config.getPartitionDimension() == null){ - jobs.add(new DeterminePartitionsUsingCardinalityJob(config)); + jobs.add(new DetermineHashedPartitionsJob(config)); } else { jobs.add(new DeterminePartitionsJob(config)); } @@ -143,12 +143,7 @@ public class HadoopDruidIndexerJob implements Jobby ); job.getConfiguration().set("io.sort.record.percent", "0.19"); - for (String propName : System.getProperties().stringPropertyNames()) { - Configuration conf = job.getConfiguration(); - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } - } + JobHelper.injectSystemProperties(job); config.addInputPaths(job); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index e896ad89c97..39ad77fe8a4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -84,9 +84,7 @@ import java.util.zip.ZipOutputStream; public class IndexGeneratorJob implements Jobby { private static final Logger log = new Logger(IndexGeneratorJob.class); - private final HadoopDruidIndexerConfig config; - private IndexGeneratorStats jobStats; public IndexGeneratorJob( @@ -97,6 +95,39 @@ public class IndexGeneratorJob implements Jobby this.jobStats = new IndexGeneratorStats(); } + public static List getPublishedSegments(HadoopDruidIndexerConfig config) + { + + final Configuration conf = new Configuration(); + final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper; + + ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); + + for (String propName : System.getProperties().stringPropertyNames()) { + if (propName.startsWith("hadoop.")) { + conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); + } + } + + final Path descriptorInfoDir = config.makeDescriptorInfoDir(); + + try { + FileSystem fs = descriptorInfoDir.getFileSystem(conf); + + for (FileStatus status : fs.listStatus(descriptorInfoDir)) { + final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class); + publishedSegmentsBuilder.add(segment); + log.info("Adding segment %s to the list of published segments", segment.getIdentifier()); + } + } + catch (IOException e) { + throw Throwables.propagate(e); + } + List publishedSegments = publishedSegmentsBuilder.build(); + + return publishedSegments; + } + public IndexGeneratorStats getJobStats() { return jobStats; @@ -112,12 +143,7 @@ public class IndexGeneratorJob implements Jobby job.getConfiguration().set("io.sort.record.percent", "0.23"); - for (String propName : System.getProperties().stringPropertyNames()) { - Configuration conf = job.getConfiguration(); - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } - } + JobHelper.injectSystemProperties(job); job.setInputFormatClass(TextInputFormat.class); @@ -156,39 +182,6 @@ public class IndexGeneratorJob implements Jobby } } - public static List getPublishedSegments(HadoopDruidIndexerConfig config) - { - - final Configuration conf = new Configuration(); - final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper; - - ImmutableList.Builder publishedSegmentsBuilder = ImmutableList.builder(); - - for (String propName : System.getProperties().stringPropertyNames()) { - if (propName.startsWith("hadoop.")) { - conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); - } - } - - final Path descriptorInfoDir = config.makeDescriptorInfoDir(); - - try { - FileSystem fs = descriptorInfoDir.getFileSystem(conf); - - for (FileStatus status : fs.listStatus(descriptorInfoDir)) { - final DataSegment segment = jsonMapper.readValue(fs.open(status.getPath()), DataSegment.class); - publishedSegmentsBuilder.add(segment); - log.info("Adding segment %s to the list of published segments", segment.getIdentifier()); - } - } - catch (IOException e) { - throw Throwables.propagate(e); - } - List publishedSegments = publishedSegmentsBuilder.build(); - - return publishedSegments; - } - public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper { @@ -225,7 +218,15 @@ public class IndexGeneratorJob implements Jobby final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes()); bytes.position(4); // Skip length added by SortableBytes int shardNum = bytes.getInt(); - return shardNum % numPartitions; + if (System.getProperty("mapred.job.tracker").equals("local")) { + return shardNum % numPartitions; + } else { + if (shardNum >= numPartitions) { + throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions); + } + return shardNum; + + } } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 654f70b5b4d..3d61538cab2 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -94,4 +94,14 @@ public class JobHelper } } } + + public static void injectSystemProperties(Job job) + { + final Configuration conf = job.getConfiguration(); + for (String propName : System.getProperties().stringPropertyNames()) { + if (propName.startsWith("hadoop.")) { + conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); + } + } + } } diff --git a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java index 7720462e808..a7e193f1dcf 100644 --- a/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java +++ b/server/src/main/java/io/druid/timeline/partition/HashBasedNumberedShardSpec.java @@ -39,9 +39,9 @@ import java.util.Set; public class HashBasedNumberedShardSpec extends NumberedShardSpec { - private static HashFunction hashFunction = null; + private static final HashFunction hashFunction = Hashing.murmur3_32(); @JacksonInject - public ObjectMapper jsonMapper; + private ObjectMapper jsonMapper; @JsonCreator public HashBasedNumberedShardSpec( @@ -50,7 +50,6 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec ) { super(partitionNum, partitions); - hashFunction = Hashing.murmur3_32(); } @Override @@ -90,4 +89,4 @@ public class HashBasedNumberedShardSpec extends NumberedShardSpec '}'; } -} \ No newline at end of file +} diff --git a/server/src/test/java/io/druid/TestUtil.java b/server/src/test/java/io/druid/TestUtil.java index e184c77998f..9f47f2183fb 100644 --- a/server/src/test/java/io/druid/TestUtil.java +++ b/server/src/test/java/io/druid/TestUtil.java @@ -19,8 +19,12 @@ package io.druid; +import com.fasterxml.jackson.databind.BeanProperty; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.common.ISE; import io.druid.guice.ServerModule; import io.druid.jackson.DefaultObjectMapper; @@ -37,5 +41,20 @@ public class TestUtil for (Module module : list) { MAPPER.registerModule(module); } + MAPPER.setInjectableValues( + new InjectableValues() + { + @Override + public Object findInjectableValue( + Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance + ) + { + if (valueId.equals("com.fasterxml.jackson.databind.ObjectMapper")) { + return TestUtil.MAPPER; + } + throw new ISE("No Injectable value found"); + } + } + ); } } diff --git a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java index 59fa866beb4..9b30768ca40 100644 --- a/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java +++ b/server/src/test/java/io/druid/server/shard/HashBasedNumberedShardSpecTest.java @@ -38,25 +38,6 @@ import java.util.List; public class HashBasedNumberedShardSpecTest { - @Before - public void setup() - { - TestUtil.MAPPER.setInjectableValues( - new InjectableValues() - { - @Override - public Object findInjectableValue( - Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance - ) - { - if (valueId.equals("com.fasterxml.jackson.databind.ObjectMapper")) { - return TestUtil.MAPPER; - } - throw new ISE("No Injectable value found"); - } - } - ); - } @Test public void testSerdeRoundTrip() throws Exception