diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index df47c2574ca..76cf0774861 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -420,7 +420,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public Object get() { - final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex]; + final String[][] dims = currEntry.getKey().getDims(); + if(dimensionIndex >= dims.length) { + return null; + } + final String[] dimVals = dims[dimensionIndex]; if (dimVals.length == 1) { return dimVals[0]; } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 1ce05305b5d..7f17d7fc2e9 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -34,6 +34,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.JavaScriptAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.filter.DimFilters; import io.druid.query.groupby.GroupByQuery; @@ -52,6 +53,7 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; /** */ @@ -79,26 +81,7 @@ public class IncrementalIndexStorageAdapterTest ) ); - GroupByQueryEngine engine = new GroupByQueryEngine( - Suppliers.ofInstance(new GroupByQueryConfig() - { - @Override - public int getMaxIntermediateRows() - { - return 5; - } - }), - new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(50000); - } - } - ) - ); + GroupByQueryEngine engine = makeGroupByQueryEngine(); final Sequence rows = engine.process( GroupByQuery.builder() @@ -123,6 +106,93 @@ public class IncrementalIndexStorageAdapterTest Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1l), row.getEvent()); } + @Test + public void testObjectColumnSelectorOnVaryingColumnSchema() throws Exception + { + IncrementalIndex index = new IncrementalIndex( + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} + ); + + index.add( + new MapBasedInputRow( + new DateTime("2014-09-01T00:00:00"), + Lists.newArrayList("billy"), + ImmutableMap.of("billy", "hi") + ) + ); + index.add( + new MapBasedInputRow( + new DateTime("2014-09-01T01:00:00"), + Lists.newArrayList("billy", "sally"), + ImmutableMap.of( + "billy", "hip", + "sally", "hop" + ) + ) + ); + + GroupByQueryEngine engine = makeGroupByQueryEngine(); + + final Sequence rows = engine.process( + GroupByQuery.builder() + .setDataSource("test") + .setGranularity(QueryGranularity.ALL) + .setInterval(new Interval(0, new DateTime().getMillis())) + .addDimension("billy") + .addDimension("sally") + .addAggregator( + new LongSumAggregatorFactory("cnt", "cnt") + ) + .addAggregator( + new JavaScriptAggregatorFactory( + "fieldLength", + Arrays.asList("sally", "billy"), + "function(current, s, b) { return current + (s == null ? 0 : s.length) + (b == null ? 0 : b.length); }", + "function() { return 0; }", + "function(a,b) { return a + b; }" + ) + ) + .build(), + new IncrementalIndexStorageAdapter(index) + ); + + final ArrayList results = Sequences.toList(rows, Lists.newArrayList()); + + Assert.assertEquals(2, results.size()); + + MapBasedRow row = (MapBasedRow) results.get(0); + Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l, "fieldLength", 2.0), row.getEvent()); + + row = (MapBasedRow) results.get(1); + Assert.assertEquals(ImmutableMap.of("billy", "hip", "sally", "hop", "cnt", 1l, "fieldLength", 6.0), row.getEvent()); + } + + private static GroupByQueryEngine makeGroupByQueryEngine() + { + return new GroupByQueryEngine( + Suppliers.ofInstance( + new GroupByQueryConfig() + { + @Override + public int getMaxIntermediateRows() + { + return 5; + } + } + ), + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(50000); + } + } + ) + ); + } + @Test public void testResetSanity() { IncrementalIndex index = new IncrementalIndex( @@ -236,60 +306,41 @@ public class IncrementalIndexStorageAdapterTest 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} ); - index.add( - new MapBasedInputRow( - new DateTime().minus(1).getMillis(), - Lists.newArrayList("billy"), - ImmutableMap.of("billy", "hi") - ) - ); - index.add( - new MapBasedInputRow( - new DateTime().minus(1).getMillis(), - Lists.newArrayList("sally"), - ImmutableMap.of("sally", "bo") - ) - ); + index.add( + new MapBasedInputRow( + new DateTime().minus(1).getMillis(), + Lists.newArrayList("billy"), + ImmutableMap.of("billy", "hi") + ) + ); + index.add( + new MapBasedInputRow( + new DateTime().minus(1).getMillis(), + Lists.newArrayList("sally"), + ImmutableMap.of("sally", "bo") + ) + ); - GroupByQueryEngine engine = new GroupByQueryEngine( - Suppliers.ofInstance(new GroupByQueryConfig() - { - @Override - public int getMaxIntermediateRows() - { - return 5; - } - }), - new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(50000); - } - } - ) - ); + GroupByQueryEngine engine = makeGroupByQueryEngine(); - final Sequence rows = engine.process( - GroupByQuery.builder() - .setDataSource("test") - .setGranularity(QueryGranularity.ALL) - .setInterval(new Interval(0, new DateTime().getMillis())) - .addDimension("billy") - .addDimension("sally") - .addAggregator(new LongSumAggregatorFactory("cnt", "cnt")) - .setDimFilter(DimFilters.dimEquals("sally", (String) null)) - .build(), - new IncrementalIndexStorageAdapter(index) - ); + final Sequence rows = engine.process( + GroupByQuery.builder() + .setDataSource("test") + .setGranularity(QueryGranularity.ALL) + .setInterval(new Interval(0, new DateTime().getMillis())) + .addDimension("billy") + .addDimension("sally") + .addAggregator(new LongSumAggregatorFactory("cnt", "cnt")) + .setDimFilter(DimFilters.dimEquals("sally", (String) null)) + .build(), + new IncrementalIndexStorageAdapter(index) + ); - final ArrayList results = Sequences.toList(rows, Lists.newArrayList()); + final ArrayList results = Sequences.toList(rows, Lists.newArrayList()); - Assert.assertEquals(1, results.size()); + Assert.assertEquals(1, results.size()); - MapBasedRow row = (MapBasedRow) results.get(0); - Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l), row.getEvent()); - } + MapBasedRow row = (MapBasedRow) results.get(0); + Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l), row.getEvent()); + } }