mirror of https://github.com/apache/druid.git
URIExtractionNamespace: Treat null values in lookup maps as missing entries. (#3512)
* URIExtractionNamespace: Treat null values in lookup maps as missing entries. This is useful when many logical lookups are derived from the same base JSON file, and some lookups' values may be unknown sometimes. * Add test, logging message, and address other comments. * Update docs.
This commit is contained in:
parent
e10def32f2
commit
4203580290
|
@ -277,6 +277,8 @@ truck|something,3|buck
|
|||
}
|
||||
```
|
||||
|
||||
With customJson parsing, if the value field for a particular row is missing or null then that line will be skipped, and
|
||||
will not be included in the lookup.
|
||||
|
||||
### simpleJson lookupParseSpec
|
||||
The `simpleJson` lookupParseSpec does not take any parameters. It is simply a line delimited json file where the field is the key, and the field's value is the value.
|
||||
|
|
|
@ -22,7 +22,7 @@ package io.druid.data.input;
|
|||
import com.google.common.base.Charsets;
|
||||
import com.google.common.io.ByteSource;
|
||||
import com.google.common.io.LineProcessor;
|
||||
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.parsers.Parser;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -45,35 +45,63 @@ public class MapPopulator<K, V>
|
|||
this.parser = parser;
|
||||
}
|
||||
|
||||
public static class PopulateResult
|
||||
{
|
||||
private final int lines;
|
||||
private final int entries;
|
||||
|
||||
public PopulateResult(int lines, int entries)
|
||||
{
|
||||
this.lines = lines;
|
||||
this.entries = entries;
|
||||
}
|
||||
|
||||
public int getLines()
|
||||
{
|
||||
return lines;
|
||||
}
|
||||
|
||||
public int getEntries()
|
||||
{
|
||||
return entries;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read through the `source` line by line and populate `map` with the data returned from the `parser`
|
||||
*
|
||||
* @param source The ByteSource to read lines from
|
||||
* @param map The map to populate
|
||||
*
|
||||
* @return The number of entries parsed
|
||||
* @return number of lines read and entries parsed
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public long populate(final ByteSource source, final Map<K, V> map) throws IOException
|
||||
public PopulateResult populate(final ByteSource source, final Map<K, V> map) throws IOException
|
||||
{
|
||||
return source.asCharSource(Charsets.UTF_8).readLines(
|
||||
new LineProcessor<Long>()
|
||||
new LineProcessor<PopulateResult>()
|
||||
{
|
||||
private long count = 0L;
|
||||
private int lines = 0;
|
||||
private int entries = 0;
|
||||
|
||||
@Override
|
||||
public boolean processLine(String line) throws IOException
|
||||
{
|
||||
map.putAll(parser.parse(line));
|
||||
++count;
|
||||
if (lines == Integer.MAX_VALUE) {
|
||||
throw new ISE("Cannot read more than %,d lines", Integer.MAX_VALUE);
|
||||
}
|
||||
final Map<K, V> kvMap = parser.parse(line);
|
||||
map.putAll(kvMap);
|
||||
lines++;
|
||||
entries += kvMap.size();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getResult()
|
||||
public PopulateResult getResult()
|
||||
{
|
||||
return count;
|
||||
return new PopulateResult(lines, entries);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -219,13 +219,12 @@ public class URIExtractionNamespace implements ExtractionNamespace
|
|||
key,
|
||||
input
|
||||
).toString(); // Just in case is long
|
||||
final String val = Preconditions.checkNotNull(
|
||||
inner.get(value),
|
||||
"Value column [%s] missing data in line [%s]",
|
||||
value,
|
||||
input
|
||||
).toString();
|
||||
return ImmutableMap.<String, String>of(k, val);
|
||||
final Object val = inner.get(value);
|
||||
if (val == null) {
|
||||
// Skip null or missing values, treat them as if there were no row at all.
|
||||
return ImmutableMap.of();
|
||||
}
|
||||
return ImmutableMap.of(k, val.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -167,13 +167,14 @@ public class URIExtractionNamespaceCacheFactory implements ExtractionNamespaceCa
|
|||
}
|
||||
};
|
||||
}
|
||||
final long lineCount = new MapPopulator<>(
|
||||
final MapPopulator.PopulateResult populateResult = new MapPopulator<>(
|
||||
extractionNamespace.getNamespaceParseSpec()
|
||||
.getParser()
|
||||
).populate(source, cache);
|
||||
log.info(
|
||||
"Finished loading %d lines for namespace [%s]",
|
||||
lineCount,
|
||||
"Finished loading %,d values from %,d lines for namespace [%s]",
|
||||
populateResult.getEntries(),
|
||||
populateResult.getLines(),
|
||||
id
|
||||
);
|
||||
return version;
|
||||
|
|
|
@ -22,12 +22,15 @@ package io.druid.query.lookup.namespace;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.io.CharSink;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.data.input.MapPopulator;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import org.hamcrest.BaseMatcher;
|
||||
import org.hamcrest.Description;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
@ -37,13 +40,24 @@ import org.junit.rules.TemporaryFolder;
|
|||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class JSONFlatDataParserTest
|
||||
{
|
||||
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
|
||||
private static final String KEY = "foo";
|
||||
private static final String VAL = "bar";
|
||||
private static final String KEY1 = "foo1";
|
||||
private static final String KEY2 = "foo2";
|
||||
private static final String VAL1 = "bar";
|
||||
private static final String VAL2 = "baz";
|
||||
private static final String OTHERVAL1 = "3";
|
||||
private static final String OTHERVAL2 = null;
|
||||
private static final String CANBEEMPTY1 = "";
|
||||
private static final String CANBEEMPTY2 = "notEmpty";
|
||||
private static final List<Map<String, Object>> MAPPINGS = ImmutableList.<Map<String, Object>>of(
|
||||
ImmutableMap.<String, Object>of("key", "foo1", "val", "bar", "otherVal", 3, "canBeEmpty", ""),
|
||||
ImmutableMap.<String, Object>of("key", "foo2", "val", "baz", "canBeEmpty", "notEmpty")
|
||||
);
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
@Rule
|
||||
|
@ -55,7 +69,24 @@ public class JSONFlatDataParserTest
|
|||
{
|
||||
tmpFile = temporaryFolder.newFile("lookup.json");
|
||||
final CharSink sink = Files.asByteSink(tmpFile).asCharSink(Charsets.UTF_8);
|
||||
sink.write("{\"key\":\"" + KEY + "\",\"val\":\"" + VAL + "\"}");
|
||||
sink.writeLines(
|
||||
Iterables.transform(
|
||||
MAPPINGS,
|
||||
new Function<Map<String, Object>, CharSequence>()
|
||||
{
|
||||
@Override
|
||||
public CharSequence apply(Map<String, Object> input)
|
||||
{
|
||||
try {
|
||||
return MAPPER.writeValueAsString(input);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -68,37 +99,51 @@ public class JSONFlatDataParserTest
|
|||
);
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
new MapPopulator<>(parser.getParser()).populate(Files.asByteSource(tmpFile), map);
|
||||
Assert.assertEquals(VAL, map.get(KEY));
|
||||
Assert.assertEquals(VAL1, map.get(KEY1));
|
||||
Assert.assertEquals(VAL2, map.get(KEY2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailParse() throws Exception
|
||||
public void testParseWithNullValues() throws Exception
|
||||
{
|
||||
expectedException.expect(new BaseMatcher<Object>()
|
||||
{
|
||||
@Override
|
||||
public boolean matches(Object o)
|
||||
{
|
||||
if (!(o instanceof NullPointerException)) {
|
||||
return false;
|
||||
}
|
||||
final NullPointerException npe = (NullPointerException) o;
|
||||
return npe.getMessage().startsWith("Key column [keyWHOOPS] missing data in line");
|
||||
}
|
||||
final URIExtractionNamespace.JSONFlatDataParser parser = new URIExtractionNamespace.JSONFlatDataParser(
|
||||
MAPPER,
|
||||
"key",
|
||||
"otherVal"
|
||||
);
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
new MapPopulator<>(parser.getParser()).populate(Files.asByteSource(tmpFile), map);
|
||||
Assert.assertEquals(OTHERVAL1, map.get(KEY1));
|
||||
Assert.assertEquals(OTHERVAL2, map.get(KEY2));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void describeTo(Description description)
|
||||
{
|
||||
@Test
|
||||
public void testParseWithEmptyValues() throws Exception
|
||||
{
|
||||
final URIExtractionNamespace.JSONFlatDataParser parser = new URIExtractionNamespace.JSONFlatDataParser(
|
||||
MAPPER,
|
||||
"key",
|
||||
"canBeEmpty"
|
||||
);
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
new MapPopulator<>(parser.getParser()).populate(Files.asByteSource(tmpFile), map);
|
||||
Assert.assertEquals(CANBEEMPTY1, map.get(KEY1));
|
||||
Assert.assertEquals(CANBEEMPTY2, map.get(KEY2));
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
@Test
|
||||
public void testFailParseOnKeyMissing() throws Exception
|
||||
{
|
||||
final URIExtractionNamespace.JSONFlatDataParser parser = new URIExtractionNamespace.JSONFlatDataParser(
|
||||
MAPPER,
|
||||
"keyWHOOPS",
|
||||
"val"
|
||||
);
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
|
||||
expectedException.expect(NullPointerException.class);
|
||||
expectedException.expectMessage("Key column [keyWHOOPS] missing data in line");
|
||||
|
||||
new MapPopulator<>(parser.getParser()).populate(Files.asByteSource(tmpFile), map);
|
||||
Assert.assertEquals(VAL, map.get(KEY));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue