diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index 9186d4b304e..556432a64ee 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -32,6 +32,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< { private HadoopDruidIndexerConfig config; private StringInputRowParser parser; + protected GranularitySpec granularitySpec; @Override protected void setup(Context context) @@ -39,6 +40,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< { config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); parser = config.getParser(); + granularitySpec = config.getGranularitySpec(); } public HadoopDruidIndexerConfig getConfig() @@ -69,9 +71,10 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< throw e; } } - GranularitySpec spec = config.getGranularitySpec(); - if (!spec.bucketIntervals().isPresent() || spec.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())) - .isPresent()) { + + if (!granularitySpec.bucketIntervals().isPresent() + || granularitySpec.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())) + .isPresent()) { innerMap(inputRow, value, context); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index e9cbc2c2cb0..a964023e342 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -26,6 +26,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; import com.google.common.io.Closeables; import com.google.common.primitives.Longs; import com.metamx.common.IAE; @@ -34,7 +36,9 @@ import com.metamx.common.guava.CloseQuietly; import com.metamx.common.logger.Logger; import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; +import io.druid.data.input.Rows; import io.druid.data.input.impl.StringInputRowParser; +import io.druid.granularity.QueryGranularity; import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexIO; @@ -211,6 +215,8 @@ public class IndexGeneratorJob implements Jobby public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper { + private static final HashFunction hashFunction = Hashing.murmur3_128(); + @Override protected void innerMap( InputRow inputRow, @@ -225,10 +231,24 @@ public class IndexGeneratorJob implements Jobby throw new ISE("WTF?! No bucket found for row: %s", inputRow); } + final long truncatedTimestamp = granularitySpec.getQueryGranularity().truncate(inputRow.getTimestampFromEpoch()); + final byte[] hashedDimensions = hashFunction.hashBytes( + HadoopDruidIndexerConfig.jsonMapper.writeValueAsBytes( + Rows.toGroupKey( + truncatedTimestamp, + inputRow + ) + ) + ).asBytes(); + context.write( new SortableBytes( bucket.get().toGroupKey(), - Longs.toByteArray(inputRow.getTimestampFromEpoch()) + // sort rows by truncated timestamp and hashed dimensions to help reduce spilling on the reducer side + ByteBuffer.allocate(Longs.BYTES + hashedDimensions.length) + .putLong(truncatedTimestamp) + .put(hashedDimensions) + .array() ).toBytesWritable(), text );