IncrementalIndex: Fix multi-value dimensions returned from iterators. (#3344)

They had arrays as values, which MapBasedRow doesn't understand and
toStrings rather than converting to lists.
This commit is contained in:
Gian Merlino 2016-08-10 08:47:29 -07:00 committed by Slim
parent b21a98e2f6
commit a2bcd97512
4 changed files with 129 additions and 16 deletions

View File

@ -25,9 +25,9 @@ import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.query.aggregation.AggregatorFactory;
@ -46,6 +46,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -65,11 +66,9 @@ public class IndexGeneratorCombinerTest
"website",
HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "keywords")), null, null)
),
null
),
@ -145,17 +144,19 @@ public class IndexGeneratorCombinerTest
InputRow row1 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of(),
ImmutableList.<String>of("keywords"),
ImmutableMap.<String, Object>of(
"host", "host1",
"keywords", Arrays.asList("foo", "bar"),
"visited", 10
)
);
InputRow row2 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of(),
ImmutableList.<String>of("keywords"),
ImmutableMap.<String, Object>of(
"host", "host2",
"keywords", Arrays.asList("foo", "bar"),
"visited", 5
)
);
@ -176,10 +177,85 @@ public class IndexGeneratorCombinerTest
context
);
EasyMock.verify(context);
Assert.assertTrue(captureKey.getValue() == key);
InputRow capturedRow = InputRowSerde.fromBytes(captureVal.getValue().getBytes(), aggregators);
Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow.getDimensions());
Assert.assertEquals(ImmutableList.of(), capturedRow.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow.getDimension("keywords"));
Assert.assertEquals(15, capturedRow.getLongMetric("visited_sum"));
Assert.assertEquals(2.0, (Double)HyperUniquesAggregatorFactory.estimateCardinality(capturedRow.getRaw("unique_hosts")), 0.001);
}
@Test
public void testMultipleRowsNotMerged() throws Exception
{
long timestamp = System.currentTimeMillis();
Bucket bucket = new Bucket(0, new DateTime(timestamp), 0);
SortableBytes keySortableBytes = new SortableBytes(
bucket.toGroupKey(),
new byte[0]
);
BytesWritable key = keySortableBytes.toBytesWritable();
InputRow row1 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of("host", "keywords"),
ImmutableMap.<String, Object>of(
"host", "host1",
"keywords", Arrays.asList("foo", "bar"),
"visited", 10
)
);
InputRow row2 = new MapBasedInputRow(
timestamp,
ImmutableList.<String>of("host", "keywords"),
ImmutableMap.<String, Object>of(
"host", "host2",
"keywords", Arrays.asList("foo", "bar"),
"visited", 5
)
);
List<BytesWritable> rows = Lists.newArrayList(
new BytesWritable(InputRowSerde.toBytes(row1, aggregators, true)),
new BytesWritable(InputRowSerde.toBytes(row2, aggregators, true))
);
Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class);
Capture<BytesWritable> captureKey1 = Capture.newInstance();
Capture<BytesWritable> captureVal1 = Capture.newInstance();
Capture<BytesWritable> captureKey2 = Capture.newInstance();
Capture<BytesWritable> captureVal2 = Capture.newInstance();
context.write(EasyMock.capture(captureKey1), EasyMock.capture(captureVal1));
context.write(EasyMock.capture(captureKey2), EasyMock.capture(captureVal2));
EasyMock.replay(context);
combiner.reduce(
key,
rows,
context
);
EasyMock.verify(context);
Assert.assertTrue(captureKey1.getValue() == key);
Assert.assertTrue(captureKey2.getValue() == key);
InputRow capturedRow1 = InputRowSerde.fromBytes(captureVal1.getValue().getBytes(), aggregators);
Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow1.getDimensions());
Assert.assertEquals(Arrays.asList("host1"), capturedRow1.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow1.getDimension("keywords"));
Assert.assertEquals(10, capturedRow1.getLongMetric("visited_sum"));
Assert.assertEquals(1.0, (Double)HyperUniquesAggregatorFactory.estimateCardinality(capturedRow1.getRaw("unique_hosts")), 0.001);
InputRow capturedRow2 = InputRowSerde.fromBytes(captureVal2.getValue().getBytes(), aggregators);
Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow2.getDimensions());
Assert.assertEquals(Arrays.asList("host2"), capturedRow2.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow2.getDimension("keywords"));
Assert.assertEquals(5, capturedRow2.getLongMetric("visited_sum"));
Assert.assertEquals(1.0, (Double)HyperUniquesAggregatorFactory.estimateCardinality(capturedRow2.getRaw("unique_hosts")), 0.001);
}
}

View File

@ -939,13 +939,13 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
theVals.put(dimensionName, val);
} else {
Comparable[] dimVals = new Comparable[dim.length];
for (int j = 0; j < dimVals.length; j++) {
List<Comparable> dimVals = new ArrayList<Comparable>(dim.length);
for (int j = 0; j < dim.length; j++) {
Comparable val = dimensionDesc.getValues().getValue(dim[j]);
if (type == ValueType.STRING) {
val = Strings.nullToEmpty((String) val);
}
dimVals[j] = val;
dimVals.add(val);
}
theVals.put(dimensionName, dimVals);
}

View File

@ -307,14 +307,43 @@ public class IncrementalIndexTest
new FilteredAggregatorFactory(
new CountAggregatorFactory("count_bound_filtered"),
new BoundDimFilter("dim2", "2", "3", false, true, null, null, StringComparators.NUMERIC)
),
new FilteredAggregatorFactory(
new CountAggregatorFactory("count_multivaldim_filtered"),
new SelectorDimFilter("dim3", "b", null)
),
new FilteredAggregatorFactory(
new CountAggregatorFactory("count_numeric_filtered"),
new SelectorDimFilter("met1", "11", null)
)
})
);
populateIndex(timestamp, index);
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames());
index.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("dim1", "dim2", "dim3"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "dim3", Lists.newArrayList("b", "a"), "met1", 10)
)
);
index.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("dim1", "dim2", "dim3"),
ImmutableMap.<String, Object>of("dim1", "3", "dim2", "4", "dim3", Lists.newArrayList("c", "d"), "met1", 11)
)
);
Assert.assertEquals(Arrays.asList("dim1", "dim2", "dim3"), index.getDimensionNames());
Assert.assertEquals(
Arrays.asList("count", "count_selector_filtered", "count_bound_filtered"),
Arrays.asList(
"count",
"count_selector_filtered",
"count_bound_filtered",
"count_multivaldim_filtered",
"count_numeric_filtered"
),
index.getMetricNames()
);
Assert.assertEquals(2, index.size());
@ -324,15 +353,23 @@ public class IncrementalIndexTest
Assert.assertEquals(timestamp, row.getTimestampFromEpoch());
Assert.assertEquals(Arrays.asList("1"), row.getDimension("dim1"));
Assert.assertEquals(Arrays.asList("2"), row.getDimension("dim2"));
Assert.assertEquals(Arrays.asList("a", "b"), row.getDimension("dim3"));
Assert.assertEquals(1L, row.getLongMetric("count"));
Assert.assertEquals(1L, row.getLongMetric("count_selector_filtered"));
Assert.assertEquals(1L, row.getLongMetric("count_bound_filtered"));
Assert.assertEquals(1L, row.getLongMetric("count_multivaldim_filtered"));
Assert.assertEquals(0L, row.getLongMetric("count_numeric_filtered"));
row = rows.next();
Assert.assertEquals(timestamp, row.getTimestampFromEpoch());
Assert.assertEquals(Arrays.asList("3"), row.getDimension("dim1"));
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
Assert.assertEquals(Arrays.asList("c", "d"), row.getDimension("dim3"));
Assert.assertEquals(1L, row.getLongMetric("count"));
Assert.assertEquals(0L, row.getLongMetric("count_selector_filtered"));
Assert.assertEquals(0L, row.getLongMetric("count_bound_filtered"));
Assert.assertEquals(0L, row.getLongMetric("count_multivaldim_filtered"));
Assert.assertEquals(1L, row.getLongMetric("count_numeric_filtered"));
}
@Test

View File

@ -215,9 +215,9 @@ public class IncrementalIndexTest
Row row = index.iterator().next();
Assert.assertArrayEquals(new String[]{"", "", "A"}, (Object[]) row.getRaw("string"));
Assert.assertArrayEquals(new Float[]{null, null, Float.MAX_VALUE}, (Object[]) row.getRaw("float"));
Assert.assertArrayEquals(new Long[]{null, null, Long.MIN_VALUE}, (Object[]) row.getRaw("long"));
Assert.assertEquals(Arrays.asList(new String[]{"", "", "A"}), row.getRaw("string"));
Assert.assertEquals(Arrays.asList(new Float[]{null, null, Float.MAX_VALUE}), row.getRaw("float"));
Assert.assertEquals(Arrays.asList(new Long[]{null, null, Long.MIN_VALUE}), row.getRaw("long"));
}
@Test