Merge pull request #2765 from navis/invalid-encode-nullstring

Null string is encoded as "null" in incremental index
This commit is contained in:
Gian Merlino 2016-04-01 14:43:40 -07:00
commit 23d66e5ff9
2 changed files with 61 additions and 29 deletions

View File

@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
@ -38,7 +39,6 @@ import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.NewSpatialDimensionSchema;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
@ -101,7 +101,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
@Override
public String apply(final Object o)
{
return String.valueOf(o);
return o == null ? null : String.valueOf(o);
}
};
@ -114,8 +114,9 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return null;
}
if (o instanceof String) {
String s = (String) o;
try {
return Long.valueOf((String) o);
return s.isEmpty() ? null : Long.valueOf(s);
}
catch (NumberFormatException nfe) {
throw new ParseException(nfe, "Unable to parse value[%s] as long in column: ", o);
@ -137,8 +138,9 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return null;
}
if (o instanceof String) {
String s = (String) o;
try {
return Float.valueOf((String) o);
return s.isEmpty() ? null : Float.valueOf(s);
}
catch (NumberFormatException nfe) {
throw new ParseException(nfe, "Unable to parse value[%s] as float in column: ", o);
@ -355,6 +357,9 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final List<DimDim> dimValues;
// looks need a configuration
private final Ordering<Comparable> ordering = Ordering.natural().nullsFirst();
private final AtomicInteger numEntries = new AtomicInteger();
// This is modified on add() in a critical section.
@ -714,7 +719,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
Comparable[] dimArray = dimValues.toArray(new Comparable[dimValues.size()]);
Arrays.sort(dimArray);
Arrays.sort(dimArray, ordering);
final int[] retVal = new int[dimArray.length];

View File

@ -25,13 +25,19 @@ import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.FloatDimensionSchema;
import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.CloserRule;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.CloserRule;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -66,6 +72,25 @@ public class IncrementalIndexTest
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
DimensionsSpec dimensions = new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("string"),
new FloatDimensionSchema("float"),
new LongDimensionSchema("long")
), null, null
);
AggregatorFactory[] metrics = {
new FilteredAggregatorFactory(
new CountAggregatorFactory("cnt"),
new SelectorDimFilter("billy", "A")
)
};
final IncrementalIndexSchema schema = new IncrementalIndexSchema(
0,
QueryGranularity.MINUTE,
dimensions,
metrics
);
return Arrays.asList(
new Object[][]{
{
@ -74,20 +99,9 @@ public class IncrementalIndexTest
@Override
public IncrementalIndex createIndex()
{
return new OnheapIncrementalIndex(
0,
QueryGranularity.MINUTE,
new AggregatorFactory[]{
new FilteredAggregatorFactory(
new CountAggregatorFactory("cnt"),
new SelectorDimFilter("billy", "A")
)
},
1000
);
return new OnheapIncrementalIndex(schema, true, 1000);
}
}
},
{
new IndexCreator()
@ -96,16 +110,7 @@ public class IncrementalIndexTest
public IncrementalIndex createIndex()
{
return new OffheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{
new FilteredAggregatorFactory(
new CountAggregatorFactory("cnt"),
new SelectorDimFilter("billy", "A")
)
},
1000000,
new StupidPool<ByteBuffer>(
schema, true, true, true, 1000000, new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
@ -144,7 +149,7 @@ public class IncrementalIndexTest
}
@Test(expected = ISE.class)
public void testDuplicateDimensionsFirstOccurance() throws IndexSizeExceededException
public void testDuplicateDimensionsFirstOccurrence() throws IndexSizeExceededException
{
IncrementalIndex index = closer.closeLater(indexCreator.createIndex());
index.add(
@ -182,4 +187,26 @@ public class IncrementalIndexTest
)
);
}
@Test
public void testNullDimensionTransform() throws IndexSizeExceededException
{
IncrementalIndex<?> index = closer.closeLater(indexCreator.createIndex());
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),
Lists.newArrayList("string", "float", "long"),
ImmutableMap.<String, Object>of(
"string", Arrays.asList("A", null, ""),
"float", Arrays.asList(Float.MAX_VALUE, null, ""),
"long", Arrays.asList(Long.MIN_VALUE, null, ""))
)
);
Row row = index.iterator().next();
Assert.assertArrayEquals(new String[]{"", "", "A"}, (Object[]) row.getRaw("string"));
Assert.assertArrayEquals(new Float[]{null, null, Float.MAX_VALUE}, (Object[]) row.getRaw("float"));
Assert.assertArrayEquals(new Long[]{null, null, Long.MIN_VALUE}, (Object[]) row.getRaw("long"));
}
}