diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 3ce3bc6bd02..673a79d6686 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.metamx.common.IAE; @@ -38,7 +39,6 @@ import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; -import io.druid.data.input.impl.NewSpatialDimensionSchema; import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; @@ -101,7 +101,7 @@ public abstract class IncrementalIndex implements Iterable, @Override public String apply(final Object o) { - return String.valueOf(o); + return o == null ? null : String.valueOf(o); } }; @@ -114,8 +114,9 @@ public abstract class IncrementalIndex implements Iterable, return null; } if (o instanceof String) { + String s = (String) o; try { - return Long.valueOf((String) o); + return s.isEmpty() ? null : Long.valueOf(s); } catch (NumberFormatException nfe) { throw new ParseException(nfe, "Unable to parse value[%s] as long in column: ", o); @@ -137,8 +138,9 @@ public abstract class IncrementalIndex implements Iterable, return null; } if (o instanceof String) { + String s = (String) o; try { - return Float.valueOf((String) o); + return s.isEmpty() ? null : Float.valueOf(s); } catch (NumberFormatException nfe) { throw new ParseException(nfe, "Unable to parse value[%s] as float in column: ", o); @@ -355,6 +357,9 @@ public abstract class IncrementalIndex implements Iterable, private final Map columnCapabilities; private final List dimValues; + // looks need a configuration + private final Ordering ordering = Ordering.natural().nullsFirst(); + private final AtomicInteger numEntries = new AtomicInteger(); // This is modified on add() in a critical section. @@ -714,7 +719,7 @@ public abstract class IncrementalIndex implements Iterable, } Comparable[] dimArray = dimValues.toArray(new Comparable[dimValues.size()]); - Arrays.sort(dimArray); + Arrays.sort(dimArray, ordering); final int[] retVal = new int[dimArray.length]; diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index 0fd115b45c6..899e42b1f59 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -25,13 +25,19 @@ import com.google.common.collect.Lists; import com.metamx.common.ISE; import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.Row; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.FloatDimensionSchema; +import io.druid.data.input.impl.LongDimensionSchema; +import io.druid.data.input.impl.StringDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; -import io.druid.segment.CloserRule; import io.druid.query.aggregation.FilteredAggregatorFactory; import io.druid.query.filter.SelectorDimFilter; +import io.druid.segment.CloserRule; import org.joda.time.DateTime; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -66,6 +72,25 @@ public class IncrementalIndexTest @Parameterized.Parameters public static Collection constructorFeeder() throws IOException { + DimensionsSpec dimensions = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("string"), + new FloatDimensionSchema("float"), + new LongDimensionSchema("long") + ), null, null + ); + AggregatorFactory[] metrics = { + new FilteredAggregatorFactory( + new CountAggregatorFactory("cnt"), + new SelectorDimFilter("billy", "A") + ) + }; + final IncrementalIndexSchema schema = new IncrementalIndexSchema( + 0, + QueryGranularity.MINUTE, + dimensions, + metrics + ); return Arrays.asList( new Object[][]{ { @@ -74,20 +99,9 @@ public class IncrementalIndexTest @Override public IncrementalIndex createIndex() { - return new OnheapIncrementalIndex( - 0, - QueryGranularity.MINUTE, - new AggregatorFactory[]{ - new FilteredAggregatorFactory( - new CountAggregatorFactory("cnt"), - new SelectorDimFilter("billy", "A") - ) - }, - 1000 - ); + return new OnheapIncrementalIndex(schema, true, 1000); } } - }, { new IndexCreator() @@ -96,16 +110,7 @@ public class IncrementalIndexTest public IncrementalIndex createIndex() { return new OffheapIncrementalIndex( - 0L, - QueryGranularity.NONE, - new AggregatorFactory[]{ - new FilteredAggregatorFactory( - new CountAggregatorFactory("cnt"), - new SelectorDimFilter("billy", "A") - ) - }, - 1000000, - new StupidPool( + schema, true, true, true, 1000000, new StupidPool( new Supplier() { @Override @@ -144,7 +149,7 @@ public class IncrementalIndexTest } @Test(expected = ISE.class) - public void testDuplicateDimensionsFirstOccurance() throws IndexSizeExceededException + public void testDuplicateDimensionsFirstOccurrence() throws IndexSizeExceededException { IncrementalIndex index = closer.closeLater(indexCreator.createIndex()); index.add( @@ -182,4 +187,26 @@ public class IncrementalIndexTest ) ); } + + @Test + public void testNullDimensionTransform() throws IndexSizeExceededException + { + IncrementalIndex index = closer.closeLater(indexCreator.createIndex()); + index.add( + new MapBasedInputRow( + new DateTime().minus(1).getMillis(), + Lists.newArrayList("string", "float", "long"), + ImmutableMap.of( + "string", Arrays.asList("A", null, ""), + "float", Arrays.asList(Float.MAX_VALUE, null, ""), + "long", Arrays.asList(Long.MIN_VALUE, null, "")) + ) + ); + + Row row = index.iterator().next(); + + Assert.assertArrayEquals(new String[]{"", "", "A"}, (Object[]) row.getRaw("string")); + Assert.assertArrayEquals(new Float[]{null, null, Float.MAX_VALUE}, (Object[]) row.getRaw("float")); + Assert.assertArrayEquals(new Long[]{null, null, Long.MIN_VALUE}, (Object[]) row.getRaw("long")); + } }