sampler returns nulls in CSV (#8871)

* sampler returns nulls in CSV

* fixed kafka sampler test

* fix Kinesis test

* sql compatibility fix

* remove null to empty string conversion, use null

* fix sql compatibility
This commit is contained in:
Rye 2019-11-19 13:59:44 -08:00 committed by Jonathan Wei
parent c44452f0c1
commit d0913475b7
4 changed files with 514 additions and 98 deletions

View File

@ -56,6 +56,7 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -183,43 +184,55 @@ public class KafkaSamplerSpecTest
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
"{\"timestamp\":\"2008\",\"dim1\":\"a\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
ImmutableMap.<String, Object>builder()
.put("__time", 1199145600000L)
.put("dim1", "a")
.put("dim2", "y")
.put("dimLong", 10L)
.put("dimFloat", 20.0F)
.put("rows", 1L)
.put("met1sum", 1.0)
.build(),
new HashMap<String, Object>()
{
{
put("__time", 1199145600000L);
put("dim1", "a");
put("dim1t", null);
put("dim2", "y");
put("dimLong", 10L);
put("dimFloat", 20.0F);
put("rows", 1L);
put("met1sum", 1.0);
}
},
null,
null
), it.next());
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
"{\"timestamp\":\"2009\",\"dim1\":\"b\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
ImmutableMap.<String, Object>builder()
.put("__time", 1230768000000L)
.put("dim1", "b")
.put("dim2", "y")
.put("dimLong", 10L)
.put("dimFloat", 20.0F)
.put("rows", 1L)
.put("met1sum", 1.0)
.build(),
new HashMap<String, Object>()
{
{
put("__time", 1230768000000L);
put("dim1", "b");
put("dim1t", null);
put("dim2", "y");
put("dimLong", 10L);
put("dimFloat", 20.0F);
put("rows", 1L);
put("met1sum", 1.0);
}
},
null,
null
), it.next());
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
"{\"timestamp\":\"2010\",\"dim1\":\"c\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
ImmutableMap.<String, Object>builder()
.put("__time", 1262304000000L)
.put("dim1", "c")
.put("dim2", "y")
.put("dimLong", 10L)
.put("dimFloat", 20.0F)
.put("rows", 1L)
.put("met1sum", 1.0)
.build(),
new HashMap<String, Object>()
{
{
put("__time", 1262304000000L);
put("dim1", "c");
put("dim1t", null);
put("dim2", "y");
put("dimLong", 10L);
put("dimFloat", 20.0F);
put("rows", 1L);
put("met1sum", 1.0);
}
},
null,
null
), it.next());

View File

@ -60,6 +60,7 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -201,43 +202,55 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
"{\"timestamp\":\"2008\",\"dim1\":\"a\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
ImmutableMap.<String, Object>builder()
.put("__time", 1199145600000L)
.put("dim1", "a")
.put("dim2", "y")
.put("dimLong", 10L)
.put("dimFloat", 20.0F)
.put("rows", 1L)
.put("met1sum", 1.0)
.build(),
new HashMap<String, Object>()
{
{
put("__time", 1199145600000L);
put("dim1", "a");
put("dim1t", null);
put("dim2", "y");
put("dimLong", 10L);
put("dimFloat", 20.0F);
put("rows", 1L);
put("met1sum", 1.0);
}
},
null,
null
), it.next());
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
"{\"timestamp\":\"2009\",\"dim1\":\"b\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
ImmutableMap.<String, Object>builder()
.put("__time", 1230768000000L)
.put("dim1", "b")
.put("dim2", "y")
.put("dimLong", 10L)
.put("dimFloat", 20.0F)
.put("rows", 1L)
.put("met1sum", 1.0)
.build(),
new HashMap<String, Object>()
{
{
put("__time", 1230768000000L);
put("dim1", "b");
put("dim1t", null);
put("dim2", "y");
put("dimLong", 10L);
put("dimFloat", 20.0F);
put("rows", 1L);
put("met1sum", 1.0);
}
},
null,
null
), it.next());
Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
"{\"timestamp\":\"2010\",\"dim1\":\"c\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
ImmutableMap.<String, Object>builder()
.put("__time", 1262304000000L)
.put("dim1", "c")
.put("dim2", "y")
.put("dimLong", 10L)
.put("dimFloat", 20.0F)
.put("rows", 1L)
.put("met1sum", 1.0)
.build(),
new HashMap<String, Object>()
{
{
put("__time", 1262304000000L);
put("dim1", "c");
put("dim1t", null);
put("dim2", "y");
put("dimLong", 10L);
put("dimFloat", 20.0F);
put("rows", 1L);
put("met1sum", 1.0);
}
},
null,
null
), it.next());

View File

@ -256,9 +256,7 @@ public class FirehoseSampler
Map<String, Object> parsed = new HashMap<>();
columnNames.forEach(k -> {
if (row.getRaw(k) != null) {
parsed.put(k, row.getRaw(k));
}
parsed.put(k, row.getRaw(k));
});
parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());

View File

@ -59,6 +59,7 @@ import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -210,6 +211,106 @@ public class FirehoseSamplerTest
Assert.assertEquals(data, response.getData());
}
@Test
public void testCSVColumnAllNull()
{
parserType = ParserType.STR_CSV;
final List<Object> str_csv_rows = ImmutableList.of(
"FirstName,LastName,Number,Gender",
"J,G,,Male",
"Kobe,Bryant,,Male",
"Lisa, Krystal,,Female",
"Michael,Jackson,,Male"
);
FirehoseFactory firehoseFactory = getFirehoseFactory(str_csv_rows);
ParseSpec parseSpec = new DelimitedParseSpec(
new TimestampSpec(null, null, DateTimes.of("1970")),
new DimensionsSpec(null),
",",
null,
null,
true,
0
);
DataSchema dataSchema = new DataSchema("sampler", OBJECT_MAPPER.convertValue(
new StringInputRowParser(parseSpec, StandardCharsets.UTF_8.name()),
new TypeReference<Map<String, Object>>()
{
}
), null, null, null, OBJECT_MAPPER);
SamplerResponse response = firehoseSampler.sample(firehoseFactory, dataSchema, null);
Assert.assertEquals(4, (int) response.getNumRowsRead());
Assert.assertEquals(4, (int) response.getNumRowsIndexed());
Assert.assertEquals(4, response.getData().size());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(
str_csv_rows.get(1).toString(),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 0L);
put("Number", null);
put("FirstName", "J");
put("LastName", "G");
put("Gender", "Male");
}
}),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
str_csv_rows.get(2).toString(),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 0L);
put("Number", null);
put("FirstName", "Kobe");
put("LastName", "Bryant");
put("Gender", "Male");
}
}),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
str_csv_rows.get(3).toString(),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 0L);
put("Number", null);
put("FirstName", "Lisa");
put("LastName", " Krystal");
put("Gender", "Female");
}
}),
null,
null
), data.get(2));
Assert.assertEquals(new SamplerResponseRow(
str_csv_rows.get(4).toString(),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 0L);
put("Number", null);
put("FirstName", "Michael");
put("LastName", "Jackson");
put("Gender", "Male");
}
}),
null,
null
), data.get(3));
}
@Test
public void testMissingValueTimestampSpec()
{
@ -224,41 +325,95 @@ public class FirehoseSamplerTest
Assert.assertEquals(6, (int) response.getNumRowsIndexed());
Assert.assertEquals(6, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo", "met1", "1"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 0L);
put("t", "2019-04-22T12:00");
put("dim2", null);
put("dim1", "foo");
put("met1", "1");
}
}),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(1).toString(),
ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo", "met1", "2"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 0L);
put("t", "2019-04-22T12:00");
put("dim2", null);
put("dim1", "foo");
put("met1", "2");
}
}),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(2).toString(),
ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:01", "dim1", "foo", "met1", "3"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 0L);
put("t", "2019-04-22T12:01");
put("dim2", null);
put("dim1", "foo");
put("met1", "3");
}
}),
null,
null
), data.get(2));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo2", "met1", "4"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 0L);
put("t", "2019-04-22T12:00");
put("dim2", null);
put("dim1", "foo2");
put("met1", "4");
}
}),
null,
null
), data.get(3));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 0L, "t", "2019-04-22T12:00", "dim1", "foo", "dim2", "bar", "met1", "5"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 0L);
put("t", "2019-04-22T12:00");
put("dim2", "bar");
put("dim1", "foo");
put("met1", "5");
}
}),
null,
null
), data.get(4));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(5).toString(),
ImmutableMap.of("__time", 0L, "t", "bad_timestamp", "dim1", "foo", "met1", "6"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 0L);
put("t", "bad_timestamp");
put("dim2", null);
put("dim1", "foo");
put("met1", "6");
}
}),
null,
null
), data.get(5));
@ -278,35 +433,75 @@ public class FirehoseSamplerTest
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(6, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "1"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo");
put("met1", "1");
}
}),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(1).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "2"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo");
put("met1", "2");
}
}),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(2).toString(),
ImmutableMap.of("__time", 1555934460000L, "dim1", "foo", "met1", "3"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934460000L);
put("dim2", null);
put("dim1", "foo");
put("met1", "3");
}
}),
null,
null
), data.get(2));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", "4"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo2");
put("met1", "4");
}
}),
null,
null
), data.get(3));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", "5"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", "bar");
put("dim1", "foo");
put("met1", "5");
}
}),
null,
null
), data.get(4));
@ -342,31 +537,66 @@ public class FirehoseSamplerTest
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "1"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim1", "foo");
put("met1", "1");
}
}),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(1).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "2"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim1", "foo");
put("met1", "2");
}
}),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(2).toString(),
ImmutableMap.of("__time", 1555934460000L, "dim1", "foo", "met1", "3"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934460000L);
put("dim1", "foo");
put("met1", "3");
}
}),
null,
null
), data.get(2));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", "4"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim1", "foo2");
put("met1", "4");
}
}),
null,
null
), data.get(3));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", "5"),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim1", "foo");
put("met1", "5");
}
}),
null,
null
), data.get(4));
@ -401,35 +631,75 @@ public class FirehoseSamplerTest
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(6, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 1L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo");
put("met1", 1L);
}
}),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(1).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 2L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo");
put("met1", 2L);
}
}),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(2).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 3L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo");
put("met1", 3L);
}
}),
null,
null
), data.get(2));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo2");
put("met1", 4L);
}
}),
null,
null
), data.get(3));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", "bar");
put("dim1", "foo");
put("met1", 5L);
}
}),
null,
null
), data.get(4));
@ -464,23 +734,47 @@ public class FirehoseSamplerTest
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(4, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 6L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo");
put("met1", 6L);
}
}),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo2");
put("met1", 4L);
}
}),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", "bar");
put("dim1", "foo");
put("met1", 5L);
}
}),
null,
null
), data.get(2));
@ -522,13 +816,27 @@ public class FirehoseSamplerTest
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 11L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim1", "foo");
put("met1", 11L);
}
}),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim1", "foo2");
put("met1", 4L);
}
}),
null,
null
), data.get(1));
@ -575,13 +883,27 @@ public class FirehoseSamplerTest
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 11L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim1", "foo");
put("met1", 11L);
}
}),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim1", "foo2");
put("met1", 4L);
}
}),
null,
null
), data.get(1));
@ -624,23 +946,47 @@ public class FirehoseSamplerTest
Assert.assertEquals(5, (int) response.getNumRowsIndexed());
Assert.assertEquals(4, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 6L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo");
put("met1", 6L);
}
}),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo2", "met1", 4L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo2");
put("met1", 4L);
}
}),
null,
null
), data.get(1));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", "bar");
put("dim1", "foo");
put("met1", 5L);
}
}),
null,
null
), data.get(2));
@ -693,13 +1039,27 @@ public class FirehoseSamplerTest
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1PlusBar", "foobar", "met1", 11L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim1PlusBar", "foobar");
put("met1", 11L);
}
}),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(3).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1PlusBar", "foo2bar", "met1", 4L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim1PlusBar", "foo2bar");
put("met1", 4L);
}
}),
null,
null
), data.get(1));
@ -738,17 +1098,33 @@ public class FirehoseSamplerTest
Assert.assertEquals(4, (int) response.getNumRowsIndexed());
Assert.assertEquals(3, response.getData().size());
List<SamplerResponseRow> data = removeEmptyColumns(response.getData());
List<SamplerResponseRow> data = response.getData();
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(0).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "met1", 6L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", null);
put("dim1", "foo");
put("met1", 6L);
}
}),
null,
null
), data.get(0));
Assert.assertEquals(new SamplerResponseRow(
getTestRows().get(4).toString(),
ImmutableMap.of("__time", 1555934400000L, "dim1", "foo", "dim2", "bar", "met1", 5L),
replaceNullValues(new HashMap<String, Object>()
{
{
put("__time", 1555934400000L);
put("dim2", "bar");
put("dim1", "foo");
put("met1", 5L);
}
}),
null,
null
), data.get(1));
@ -827,6 +1203,7 @@ public class FirehoseSamplerTest
: rows.stream().map(x -> x.withParsed(removeEmptyValues(x.getParsed()))).collect(Collectors.toList());
}
@Nullable
private Map<String, Object> removeEmptyValues(Map<String, Object> data)
{
@ -836,4 +1213,19 @@ public class FirehoseSamplerTest
.filter(x -> !(x.getValue() instanceof String) || !((String) x.getValue()).isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Nullable
private Map<String, Object> replaceNullValues(Map<String, Object> data)
{
return ParserType.STR_CSV.equals(parserType)
? USE_DEFAULT_VALUE_FOR_NULL
? data
: data.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue() == null ? "" : e.getValue()
))
: data;
}
}