From 41e88baecaec5a97f34655a3c93a61283e90f3a0 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 15 Oct 2014 23:09:28 +0530 Subject: [PATCH] Add test for bucket selection --- .../indexer/HadoopDruidIndexerConfigTest.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index ee8fa9315c5..1fe6068f2ad 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -20,15 +20,34 @@ package io.druid.indexer; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Lists; +import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.Granularity; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.JSONDataSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.indexer.rollup.DataRollupSpec; import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.timeline.partition.HashBasedNumberedShardSpec; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.joda.time.DateTime; +import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; +import java.util.List; + /** */ public class HadoopDruidIndexerConfigTest @@ -125,4 +144,68 @@ public class HadoopDruidIndexerConfigTest ); } + + @Test + public void testHashedBucketSelection() { + List specs = Lists.newArrayList(); + final int partitionCount = 10; + for (int i = 0; i < partitionCount; i++) { + specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, new DefaultObjectMapper()), i)); + } + HadoopIngestionSpec spec = new HadoopIngestionSpec( + null, null, null, + "foo", + new TimestampSpec("timestamp", "auto"), + new JSONDataSpec(ImmutableList.of("foo"), null), + new UniformGranularitySpec( + Granularity.HOUR, + QueryGranularity.MINUTE, + ImmutableList.of(new Interval("2010-01-01/P1D")), + Granularity.HOUR + ), + null, + null, + null, + null, + null, + false, + true, + ImmutableMap.of(new DateTime("2010-01-01T01:00:00"), specs), + false, + new DataRollupSpec(ImmutableList.of(), QueryGranularity.MINUTE), + null, + false, + ImmutableMap.of("foo", "bar"), + false, + null, + null, + null, + null, + null, + null + ); + HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSchema(spec); + final List dims = Arrays.asList("diM1", "dIM2"); + final ImmutableMap values = ImmutableMap.of( + "Dim1", + "1", + "DiM2", + "2", + "dim1", + "3", + "dim2", + "4" + ); + final long timestamp = new DateTime("2010-01-01T01:00:01").getMillis(); + final Bucket expectedBucket = config.getBucket(new MapBasedInputRow(timestamp, dims, values)).get(); + final long nextBucketTimestamp = QueryGranularity.MINUTE.next(QueryGranularity.MINUTE.truncate(timestamp)); + // check that all rows having same set of dims and truncated timestamp hash to same bucket + for (int i = 0; timestamp + i < nextBucketTimestamp; i++) { + Assert.assertEquals( + expectedBucket.partitionNum, + config.getBucket(new MapBasedInputRow(timestamp + i, dims, values)).get().partitionNum + ); + } + + } }