fix column selector on varying incr index schema

This commit is contained in:
Xavier Léauté 2014-09-17 10:35:58 -07:00
parent 8b387034a3
commit e5a13544a0
2 changed files with 128 additions and 73 deletions

View File

@ -420,7 +420,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override @Override
public Object get() public Object get()
{ {
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex]; final String[][] dims = currEntry.getKey().getDims();
if(dimensionIndex >= dims.length) {
return null;
}
final String[] dimVals = dims[dimensionIndex];
if (dimVals.length == 1) { if (dimVals.length == 1) {
return dimVals[0]; return dimVals[0];
} }

View File

@ -34,6 +34,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.DimFilters; import io.druid.query.filter.DimFilters;
import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQuery;
@ -52,6 +53,7 @@ import org.junit.Test;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
/** /**
*/ */
@ -79,26 +81,7 @@ public class IncrementalIndexStorageAdapterTest
) )
); );
GroupByQueryEngine engine = new GroupByQueryEngine( GroupByQueryEngine engine = makeGroupByQueryEngine();
Suppliers.<GroupByQueryConfig>ofInstance(new GroupByQueryConfig()
{
@Override
public int getMaxIntermediateRows()
{
return 5;
}
}),
new StupidPool(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(50000);
}
}
)
);
final Sequence<Row> rows = engine.process( final Sequence<Row> rows = engine.process(
GroupByQuery.builder() GroupByQuery.builder()
@ -123,6 +106,93 @@ public class IncrementalIndexStorageAdapterTest
Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1l), row.getEvent()); Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1l), row.getEvent());
} }
@Test
public void testObjectColumnSelectorOnVaryingColumnSchema() throws Exception
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);
index.add(
new MapBasedInputRow(
new DateTime("2014-09-01T00:00:00"),
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", "hi")
)
);
index.add(
new MapBasedInputRow(
new DateTime("2014-09-01T01:00:00"),
Lists.newArrayList("billy", "sally"),
ImmutableMap.<String, Object>of(
"billy", "hip",
"sally", "hop"
)
)
);
GroupByQueryEngine engine = makeGroupByQueryEngine();
final Sequence<Row> rows = engine.process(
GroupByQuery.builder()
.setDataSource("test")
.setGranularity(QueryGranularity.ALL)
.setInterval(new Interval(0, new DateTime().getMillis()))
.addDimension("billy")
.addDimension("sally")
.addAggregator(
new LongSumAggregatorFactory("cnt", "cnt")
)
.addAggregator(
new JavaScriptAggregatorFactory(
"fieldLength",
Arrays.asList("sally", "billy"),
"function(current, s, b) { return current + (s == null ? 0 : s.length) + (b == null ? 0 : b.length); }",
"function() { return 0; }",
"function(a,b) { return a + b; }"
)
)
.build(),
new IncrementalIndexStorageAdapter(index)
);
final ArrayList<Row> results = Sequences.toList(rows, Lists.<Row>newArrayList());
Assert.assertEquals(2, results.size());
MapBasedRow row = (MapBasedRow) results.get(0);
Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l, "fieldLength", 2.0), row.getEvent());
row = (MapBasedRow) results.get(1);
Assert.assertEquals(ImmutableMap.of("billy", "hip", "sally", "hop", "cnt", 1l, "fieldLength", 6.0), row.getEvent());
}
private static GroupByQueryEngine makeGroupByQueryEngine()
{
return new GroupByQueryEngine(
Suppliers.<GroupByQueryConfig>ofInstance(
new GroupByQueryConfig()
{
@Override
public int getMaxIntermediateRows()
{
return 5;
}
}
),
new StupidPool(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(50000);
}
}
)
);
}
@Test @Test
public void testResetSanity() { public void testResetSanity() {
IncrementalIndex index = new IncrementalIndex( IncrementalIndex index = new IncrementalIndex(
@ -251,26 +321,7 @@ public class IncrementalIndexStorageAdapterTest
) )
); );
GroupByQueryEngine engine = new GroupByQueryEngine( GroupByQueryEngine engine = makeGroupByQueryEngine();
Suppliers.<GroupByQueryConfig>ofInstance(new GroupByQueryConfig()
{
@Override
public int getMaxIntermediateRows()
{
return 5;
}
}),
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(50000);
}
}
)
);
final Sequence<Row> rows = engine.process( final Sequence<Row> rows = engine.process(
GroupByQuery.builder() GroupByQuery.builder()