Add support for headers and skipping thereof for CSV and TSV (#4254)

* initial commit

* small fixes

* fix bug

* fix bug

* address code review

* more cr

* more cr

* more cr

* fix

* Skip head rows for CSV and TSV

* Move checking skipHeadRows to FileIteratingFirehose

* Remove checking null iterators

* Remove unused imports

* Address comments

* Fix compilation error

* Address comments

* Add more tests

* Add a comment to ReplayableFirehose

* Addressing comments

* Add docs and fix typos
This commit is contained in:
Jihoon Son 2017-05-16 14:57:31 +09:00 committed by Jonathan Wei
parent 5ca67644e7
commit 50a4ec2b0b
36 changed files with 823 additions and 216 deletions

View File

@ -19,6 +19,7 @@
package io.druid.data.input;
import javax.annotation.Nullable;
import java.io.Closeable;
/**
@ -46,14 +47,16 @@ public interface Firehose extends Closeable
*
* @return true if and when there is another row available, false if the stream has dried up
*/
public boolean hasMore();
boolean hasMore();
/**
* The next row available. Should only be called if hasMore returns true.
* The return value can be null which means the caller must skip this row.
*
* @return The next row
*/
public InputRow nextRow();
@Nullable
InputRow nextRow();
/**
* Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is
@ -74,5 +77,5 @@ public interface Firehose extends Closeable
* because of InputRows delivered by prior calls to ##nextRow().
* </p>
*/
public Runnable commit();
Runnable commit();
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.druid.java.util.common.parsers.CSVParser;
import io.druid.java.util.common.parsers.Parser;
@ -35,26 +34,38 @@ public class CSVParseSpec extends ParseSpec
{
private final String listDelimiter;
private final List<String> columns;
private final boolean hasHeaderRow;
private final int skipHeaderRows;
@JsonCreator
public CSVParseSpec(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("listDelimiter") String listDelimiter,
@JsonProperty("columns") List<String> columns
@JsonProperty("columns") List<String> columns,
@JsonProperty("hasHeaderRow") boolean hasHeaderRow,
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
super(timestampSpec, dimensionsSpec);
this.listDelimiter = listDelimiter;
Preconditions.checkNotNull(columns, "columns");
for (String column : columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
this.columns = columns;
this.hasHeaderRow = hasHeaderRow;
this.skipHeaderRows = skipHeaderRows;
verify(dimensionsSpec.getDimensionNames());
if (columns != null) {
for (String column : columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
verify(dimensionsSpec.getDimensionNames());
} else {
Preconditions.checkArgument(
hasHeaderRow,
"If columns field is not set, the first row of your data must have your header"
+ " and hasHeaderRow must be set to true."
);
}
}
@JsonProperty
@ -69,6 +80,18 @@ public class CSVParseSpec extends ParseSpec
return columns;
}
@JsonProperty
public boolean isHasHeaderRow()
{
return hasHeaderRow;
}
@JsonProperty("skipHeaderRows")
public int getSkipHeaderRows()
{
return skipHeaderRows;
}
@Override
public void verify(List<String> usedCols)
{
@ -80,23 +103,23 @@ public class CSVParseSpec extends ParseSpec
@Override
public Parser<String, Object> makeParser()
{
return new CSVParser(Optional.fromNullable(listDelimiter), columns);
return new CSVParser(Optional.fromNullable(listDelimiter), columns, hasHeaderRow, skipHeaderRows);
}
@Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns);
return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns, hasHeaderRow, skipHeaderRows);
}
@Override
public ParseSpec withDimensionsSpec(DimensionsSpec spec)
{
return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns);
return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns, hasHeaderRow, skipHeaderRows);
}
public ParseSpec withColumns(List<String> cols)
{
return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols);
return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols, hasHeaderRow, skipHeaderRows);
}
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.druid.java.util.common.parsers.DelimitedParser;
import io.druid.java.util.common.parsers.Parser;
@ -36,6 +35,8 @@ public class DelimitedParseSpec extends ParseSpec
private final String delimiter;
private final String listDelimiter;
private final List<String> columns;
private final boolean hasHeaderRow;
private final int skipHeaderRows;
@JsonCreator
public DelimitedParseSpec(
@ -43,20 +44,31 @@ public class DelimitedParseSpec extends ParseSpec
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("delimiter") String delimiter,
@JsonProperty("listDelimiter") String listDelimiter,
@JsonProperty("columns") List<String> columns
@JsonProperty("columns") List<String> columns,
@JsonProperty("hasHeaderRow") boolean hasHeaderRow,
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
super(timestampSpec, dimensionsSpec);
this.delimiter = delimiter;
this.listDelimiter = listDelimiter;
Preconditions.checkNotNull(columns, "columns");
this.columns = columns;
for (String column : this.columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
this.hasHeaderRow = hasHeaderRow;
this.skipHeaderRows = skipHeaderRows;
verify(dimensionsSpec.getDimensionNames());
if (columns != null) {
for (String column : this.columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
verify(dimensionsSpec.getDimensionNames());
} else {
Preconditions.checkArgument(
hasHeaderRow,
"If columns field is not set, the first row of your data must have your header"
+ " and hasHeaderRow must be set to true."
);
}
}
@JsonProperty("delimiter")
@ -77,6 +89,18 @@ public class DelimitedParseSpec extends ParseSpec
return columns;
}
@JsonProperty
public boolean isHasHeaderRow()
{
return hasHeaderRow;
}
@JsonProperty("skipHeaderRows")
public int getSkipHeaderRows()
{
return skipHeaderRows;
}
@Override
public void verify(List<String> usedCols)
{
@ -88,38 +112,79 @@ public class DelimitedParseSpec extends ParseSpec
@Override
public Parser<String, Object> makeParser()
{
Parser<String, Object> retVal = new DelimitedParser(
return new DelimitedParser(
Optional.fromNullable(delimiter),
Optional.fromNullable(listDelimiter)
Optional.fromNullable(listDelimiter),
columns,
hasHeaderRow,
skipHeaderRows
);
retVal.setFieldNames(columns);
return retVal;
}
@Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns);
return new DelimitedParseSpec(
spec,
getDimensionsSpec(),
delimiter,
listDelimiter,
columns,
hasHeaderRow,
skipHeaderRows
);
}
@Override
public ParseSpec withDimensionsSpec(DimensionsSpec spec)
{
return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns);
return new DelimitedParseSpec(
getTimestampSpec(),
spec,
delimiter,
listDelimiter,
columns,
hasHeaderRow,
skipHeaderRows
);
}
public ParseSpec withDelimiter(String delim)
{
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delim, listDelimiter, columns);
return new DelimitedParseSpec(
getTimestampSpec(),
getDimensionsSpec(),
delim,
listDelimiter,
columns,
hasHeaderRow,
skipHeaderRows
);
}
public ParseSpec withListDelimiter(String delim)
{
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns);
return new DelimitedParseSpec(
getTimestampSpec(),
getDimensionsSpec(),
delimiter,
delim,
columns,
hasHeaderRow,
skipHeaderRows
);
}
public ParseSpec withColumns(List<String> cols)
{
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, listDelimiter, cols);
return new DelimitedParseSpec(
getTimestampSpec(),
getDimensionsSpec(),
delimiter,
listDelimiter,
cols,
hasHeaderRow,
skipHeaderRows
);
}
}

View File

@ -19,7 +19,6 @@
package io.druid.data.input.impl;
import com.google.common.base.Throwables;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.utils.Runnables;
@ -27,6 +26,7 @@ import org.apache.commons.io.LineIterator;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
*/
@ -50,7 +50,7 @@ public class FileIteratingFirehose implements Firehose
public boolean hasMore()
{
while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) {
lineIterator = lineIterators.next();
lineIterator = getNextLineIterator();
}
return lineIterator != null && lineIterator.hasNext();
@ -59,21 +59,22 @@ public class FileIteratingFirehose implements Firehose
@Override
public InputRow nextRow()
{
try {
if (lineIterator == null || !lineIterator.hasNext()) {
// Close old streams, maybe.
if (lineIterator != null) {
lineIterator.close();
}
lineIterator = lineIterators.next();
}
return parser.parse(lineIterator.next());
if (!hasMore()) {
throw new NoSuchElementException();
}
catch (Exception e) {
throw Throwables.propagate(e);
return parser.parse(lineIterator.next());
}
private LineIterator getNextLineIterator()
{
if (lineIterator != null) {
lineIterator.close();
}
final LineIterator iterator = lineIterators.next();
parser.startFileFromBeginning();
return iterator;
}
@Override

View File

@ -22,12 +22,12 @@ package io.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.common.parsers.Parser;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
@ -124,18 +124,30 @@ public class StringInputRowParser implements ByteBufferInputRowParser
return theMap;
}
private Map<String, Object> parseString(String inputString)
public void startFileFromBeginning()
{
return parser.parse(inputString);
parser.startFileFromBeginning();
}
public InputRow parse(String input)
@Nullable
public InputRow parse(@Nullable String input)
{
return parseMap(parseString(input));
}
private InputRow parseMap(Map<String, Object> theMap)
@Nullable
private Map<String, Object> parseString(@Nullable String inputString)
{
return parser.parse(inputString);
}
@Nullable
private InputRow parseMap(@Nullable Map<String, Object> theMap)
{
// If a header is present in the data (and with proper configurations), a null is returned
if (theMap == null) {
return null;
}
return mapParser.parse(theMap);
}
}

View File

@ -21,7 +21,6 @@ package io.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.java.util.common.parsers.Parser;
import java.util.List;

View File

@ -41,7 +41,9 @@ public class CSVParseSpecTest
Lists.<SpatialDimensionSchema>newArrayList()
),
",",
Arrays.asList("a")
Arrays.asList("a"),
false,
0
);
}
@ -60,7 +62,9 @@ public class CSVParseSpecTest
Lists.<SpatialDimensionSchema>newArrayList()
),
",",
Arrays.asList("a")
Arrays.asList("a"),
false,
0
);
}
}

View File

@ -40,7 +40,9 @@ public class DelimitedParseSpecTest
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("abc")), null, null),
"\u0001",
"\u0002",
Arrays.asList("abc")
Arrays.asList("abc"),
false,
0
);
final DelimitedParseSpec serde = jsonMapper.readValue(
jsonMapper.writeValueAsString(spec),
@ -71,7 +73,9 @@ public class DelimitedParseSpecTest
),
",",
" ",
Arrays.asList("a")
Arrays.asList("a"),
false,
0
);
}
@ -91,12 +95,15 @@ public class DelimitedParseSpecTest
),
",",
null,
Arrays.asList("a")
Arrays.asList("a"),
false,
0
);
}
@Test(expected = NullPointerException.class)
public void testDefaultColumnList(){
@Test(expected = IllegalArgumentException.class)
public void testDefaultColumnList()
{
final DelimitedParseSpec spec = new DelimitedParseSpec(
new TimestampSpec(
"timestamp",
@ -110,8 +117,9 @@ public class DelimitedParseSpecTest
),
",",
null,
// pass null columns not allowed
null
null,
false,
0
);
}
}

View File

@ -19,67 +19,107 @@
package io.druid.data.input.impl;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.java.util.common.Pair;
import junit.framework.Assert;
import io.druid.data.input.InputRow;
import org.apache.commons.io.LineIterator;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@RunWith(Parameterized.class)
public class FileIteratingFirehoseTest
{
private static final List<Pair<String[], ImmutableList<String>>> fixtures = ImmutableList.of(
Pair.of(new String[]{"2000,foo"}, ImmutableList.of("foo")),
Pair.of(new String[]{"2000,foo\n2000,bar\n"}, ImmutableList.of("foo", "bar")),
Pair.of(new String[]{"2000,foo\n2000,bar\n", "2000,baz"}, ImmutableList.of("foo", "bar", "baz")),
Pair.of(new String[]{"2000,foo\n2000,bar\n", "", "2000,baz"}, ImmutableList.of("foo", "bar", "baz")),
Pair.of(new String[]{"2000,foo\n2000,bar\n", "", "2000,baz", ""}, ImmutableList.of("foo", "bar", "baz")),
Pair.of(new String[]{""}, ImmutableList.<String>of()),
Pair.of(new String[]{}, ImmutableList.<String>of())
);
@Parameters(name = "{0}, {1}")
public static Collection<Object[]> constructorFeeder() throws IOException
{
final List<List<String>> inputTexts = ImmutableList.of(
ImmutableList.of("2000,foo"),
ImmutableList.of("2000,foo\n2000,bar\n"),
ImmutableList.of("2000,foo\n2000,bar\n", "2000,baz"),
ImmutableList.of("2000,foo\n2000,bar\n", "", "2000,baz"),
ImmutableList.of("2000,foo\n2000,bar\n", "", "2000,baz", ""),
ImmutableList.of("2000,foo\n2000,bar\n2000,baz", "", "2000,baz", "2000,foo\n2000,bar\n3000,baz"),
ImmutableList.of(""),
ImmutableList.of()
);
final List<Object[]> args = new ArrayList<>();
for (int numSkipHeadRows = 0; numSkipHeadRows < 3; numSkipHeadRows++) {
for (List<String> texts : inputTexts) {
args.add(new Object[] {texts, numSkipHeadRows});
}
}
return args;
}
private final StringInputRowParser parser;
private final List<String> inputs;
private final List<String> expectedResults;
public FileIteratingFirehoseTest(List<String> texts, int numSkipHeaderRows)
{
parser = new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null),
",",
ImmutableList.of("ts", "x"),
false,
numSkipHeaderRows
),
null
);
this.inputs = texts;
this.expectedResults = inputs.stream()
.map(input -> input.split("\n"))
.flatMap(lines -> {
final List<String> filteredLines = Arrays.asList(lines).stream()
.filter(line -> line.length() > 0)
.map(line -> line.split(",")[1])
.collect(Collectors.toList());
final int numRealSkippedRows = Math.min(filteredLines.size(), numSkipHeaderRows);
IntStream.range(0, numRealSkippedRows).forEach(i -> filteredLines.set(i, null));
return filteredLines.stream();
})
.collect(Collectors.toList());
}
@Test
public void testFirehose() throws Exception
{
for (Pair<String[], ImmutableList<String>> fixture : fixtures) {
final List<LineIterator> lineIterators = Lists.transform(
Arrays.asList(fixture.lhs),
new Function<String, LineIterator>()
{
@Override
public LineIterator apply(String s)
{
return new LineIterator(new StringReader(s));
}
}
);
final List<LineIterator> lineIterators = inputs.stream()
.map(s -> new LineIterator(new StringReader(s)))
.collect(Collectors.toList());
final StringInputRowParser parser = new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null),
",",
ImmutableList.of("ts", "x")
),
null
);
final FileIteratingFirehose firehose = new FileIteratingFirehose(lineIterators.iterator(), parser);
try (final FileIteratingFirehose firehose = new FileIteratingFirehose(lineIterators.iterator(), parser)) {
final List<String> results = Lists.newArrayList();
while (firehose.hasMore()) {
results.add(Joiner.on("|").join(firehose.nextRow().getDimension("x")));
final InputRow inputRow = firehose.nextRow();
if (inputRow == null) {
results.add(null);
} else {
results.add(Joiner.on("|").join(inputRow.getDimension("x")));
}
}
Assert.assertEquals(fixture.rhs, results);
Assert.assertEquals(expectedResults, results);
}
}
}

View File

@ -45,7 +45,9 @@ public class ParseSpecTest
),
",",
" ",
Arrays.asList("a", "b")
Arrays.asList("a", "b"),
false,
0
);
}
@ -65,7 +67,9 @@ public class ParseSpecTest
),
",",
null,
Arrays.asList("a", "B")
Arrays.asList("a", "B"),
false,
0
);
}
@ -85,7 +89,9 @@ public class ParseSpecTest
),
",",
null,
Arrays.asList("a", "B")
Arrays.asList("a", "B"),
false,
0
);
}
}

View File

@ -195,12 +195,17 @@ The `namespaceParseSpec` can be one of a number of values. Each of the examples
Only ONE file which matches the search will be used. For most implementations, the discriminator for choosing the URIs is by whichever one reports the most recent timestamp for its modification time.
### csv lookupParseSpec
|Parameter|Description|Required|Default|
|---------|-----------|--------|-------|
|`columns`|The list of columns in the csv file|yes|`null`|
|`columns`|The list of columns in the csv file|no if `hasHeaderRow` is set|`null`|
|`keyColumn`|The name of the column containing the key|no|The first column|
|`valueColumn`|The name of the column containing the value|no|The second column|
|`hasHeaderRow`|A flag to indicate that column information can be extracted from the input files' header row|no|false|
|`skipHeaderRows`|Number of header rows to be skipped|no|0|
If both `skipHeaderRows` and `hasHeaderRow` options are set, `skipHeaderRows` is first applied. For example, if you set
`skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will skip the first two lines and then extract column information
from the third line.
*example input*
@ -222,15 +227,19 @@ truck,something3,buck
```
### tsv lookupParseSpec
|Parameter|Description|Required|Default|
|---------|-----------|--------|-------|
|`columns`|The list of columns in the csv file|yes|`null`|
|`columns`|The list of columns in the tsv file|yes|`null`|
|`keyColumn`|The name of the column containing the key|no|The first column|
|`valueColumn`|The name of the column containing the value|no|The second column|
|`delimiter`|The delimiter in the file|no|tab (`\t`)|
|`listDelimiter`|The list delimiter in the file|no| (`\u0001`)|
|`hasHeaderRow`|A flag to indicate that column information can be extracted from the input files' header row|no|false|
|`skipHeaderRows`|Number of header rows to be skipped|no|0|
If both `skipHeaderRows` and `hasHeaderRow` options are set, `skipHeaderRows` is first applied. For example, if you set
`skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will skip the first two lines and then extract column information
from the third line.
*example input*

View File

@ -72,10 +72,8 @@ If you have nested JSON, [Druid can automatically flatten it for you](flatten-js
### CSV
Since the CSV data cannot contain the column names (no header is allowed), these must be added before that data can be processed:
```json
"parseSpec":{
"parseSpec": {
"format" : "csv",
"timestampSpec" : {
"column" : "timestamp"
@ -87,12 +85,27 @@ Since the CSV data cannot contain the column names (no header is allowed), these
}
```
The `columns` field must match the columns of your input data in the same order.
#### CSV Index Tasks
### TSV
If your input files contain a header, the `columns` field is optional and you don't need to set.
Instead, you can set the `hasHeaderRow` field to true, which makes Druid automatically extract the column information from the header.
Otherwise, you must set the `columns` field and ensure that field must match the columns of your input data in the same order.
Also, you can skip some header rows by setting `skipHeaderRows` in your parseSpec. If both `skipHeaderRows` and `hasHeaderRow` options are set,
`skipHeaderRows` is first applied. For example, if you set `skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will
skip the first two lines and then extract column information from the third line.
Note that `hasHeaderRow` and `skipHeaderRows` are effective only for non-Hadoop batch index tasks. Other types of index
tasks will fail with an exception.
#### Other CSV Ingestion Tasks
The `columns` field must be included and and ensure that the order of the fields matches the columns of your input data in the same order.
### TSV (Delimited)
```json
"parseSpec":{
"parseSpec": {
"format" : "tsv",
"timestampSpec" : {
"column" : "timestamp"
@ -105,10 +118,25 @@ The `columns` field must match the columns of your input data in the same order.
}
```
The `columns` field must match the columns of your input data in the same order.
Be sure to change the `delimiter` to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed.
#### TSV (Delimited) Index Tasks
If your input files contain a header, the `columns` field is optional and you don't need to set.
Instead, you can set the `hasHeaderRow` field to true, which makes Druid automatically extract the column information from the header.
Otherwise, you must set the `columns` field and ensure that field must match the columns of your input data in the same order.
Also, you can skip some header rows by setting `skipHeaderRows` in your parseSpec. If both `skipHeaderRows` and `hasHeaderRow` options are set,
`skipHeaderRows` is first applied. For example, if you set `skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will
skip the first two lines and then extract column information from the third line.
Note that `hasHeaderRow` and `skipHeaderRows` are effective only for non-Hadoop batch index tasks. Other types of index
tasks will fail with an exception.
#### Other TSV (Delimited) Ingestion Tasks
The `columns` field must be included and and ensure that the order of the fields matches the columns of your input data in the same order.
### Regex
```json

View File

@ -96,7 +96,9 @@ public class MapVirtualColumnTest
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("dim", "keys", "values")), null, null),
"\t",
",",
Arrays.asList("ts", "dim", "keys", "values")
Arrays.asList("ts", "dim", "keys", "values"),
false,
0
)
, "utf8"
);

View File

@ -27,13 +27,13 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.UOE;
@ -41,7 +41,6 @@ import io.druid.java.util.common.parsers.CSVParser;
import io.druid.java.util.common.parsers.DelimitedParser;
import io.druid.java.util.common.parsers.JSONParser;
import io.druid.java.util.common.parsers.Parser;
import org.joda.time.Period;
import javax.annotation.Nullable;
@ -264,7 +263,9 @@ public class URIExtractionNamespace implements ExtractionNamespace
public CSVFlatDataParser(
@JsonProperty("columns") List<String> columns,
@JsonProperty("keyColumn") final String keyColumn,
@JsonProperty("valueColumn") final String valueColumn
@JsonProperty("valueColumn") final String valueColumn,
@JsonProperty("hasHeaderRow") boolean hasHeaderRow,
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
Preconditions.checkArgument(
@ -293,12 +294,22 @@ public class URIExtractionNamespace implements ExtractionNamespace
);
this.parser = new DelegateParser(
new CSVParser(Optional.<String>absent(), columns),
new CSVParser(Optional.absent(), columns, hasHeaderRow, skipHeaderRows),
this.keyColumn,
this.valueColumn
);
}
@VisibleForTesting
CSVFlatDataParser(
List<String> columns,
String keyColumn,
String valueColumn
)
{
this(columns, keyColumn, valueColumn, false, 0);
}
@JsonProperty
public List<String> getColumns()
{
@ -373,7 +384,9 @@ public class URIExtractionNamespace implements ExtractionNamespace
@JsonProperty("delimiter") String delimiter,
@JsonProperty("listDelimiter") String listDelimiter,
@JsonProperty("keyColumn") final String keyColumn,
@JsonProperty("valueColumn") final String valueColumn
@JsonProperty("valueColumn") final String valueColumn,
@JsonProperty("hasHeaderRow") boolean hasHeaderRow,
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
Preconditions.checkArgument(
@ -382,7 +395,9 @@ public class URIExtractionNamespace implements ExtractionNamespace
);
final DelimitedParser delegate = new DelimitedParser(
Optional.fromNullable(Strings.emptyToNull(delimiter)),
Optional.fromNullable(Strings.emptyToNull(listDelimiter))
Optional.fromNullable(Strings.emptyToNull(listDelimiter)),
hasHeaderRow,
skipHeaderRows
);
Preconditions.checkArgument(
!(Strings.isNullOrEmpty(keyColumn) ^ Strings.isNullOrEmpty(valueColumn)),
@ -410,6 +425,18 @@ public class URIExtractionNamespace implements ExtractionNamespace
this.parser = new DelegateParser(delegate, this.keyColumn, this.valueColumn);
}
@VisibleForTesting
TSVFlatDataParser(
List<String> columns,
String delimiter,
String listDelimiter,
String keyColumn,
String valueColumn
)
{
this(columns, delimiter, listDelimiter, keyColumn, valueColumn, false, 0);
}
@JsonProperty
public List<String> getColumns()
{

View File

@ -346,7 +346,9 @@ public class BatchDeltaIngestionTest
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "host2", "visited_num")
ImmutableList.of("timestamp", "host", "host2", "visited_num"),
false,
0
),
null
),

View File

@ -54,7 +54,8 @@ public class DetermineHashedPartitionsJobTest
private int errorMargin;
@Parameterized.Parameters(name = "File={0}, TargetPartitionSize={1}, Interval={2}, ErrorMargin={3}, NumTimeBuckets={4}, NumShards={5}")
public static Collection<?> data(){
public static Collection<?> data()
{
int[] first = new int[1];
Arrays.fill(first, 13);
int[] second = new int[6];
@ -116,7 +117,12 @@ public class DetermineHashedPartitionsJobTest
new DelimitedParseSpec(
new TimestampSpec("ts", null, null),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("market", "quality", "placement", "placementish")),
DimensionsSpec.getDefaultSchemas(ImmutableList.of(
"market",
"quality",
"placement",
"placementish"
)),
null,
null
),
@ -129,7 +135,9 @@ public class DetermineHashedPartitionsJobTest
"placement",
"placementish",
"index"
)
),
false,
0
),
null
),
@ -176,7 +184,8 @@ public class DetermineHashedPartitionsJobTest
}
@Test
public void testDetermineHashedPartitions(){
public void testDetermineHashedPartitions()
{
DetermineHashedPartitionsJob determineHashedPartitionsJob = new DetermineHashedPartitionsJob(indexerConfig);
determineHashedPartitionsJob.run();
Map<Long, List<HadoopyShardSpec>> shardSpecs = indexerConfig.getSchema().getTuningConfig().getShardSpecs();
@ -184,8 +193,8 @@ public class DetermineHashedPartitionsJobTest
expectedNumTimeBuckets,
shardSpecs.entrySet().size()
);
int i=0;
for(Map.Entry<Long, List<HadoopyShardSpec>> entry : shardSpecs.entrySet()) {
int i = 0;
for (Map.Entry<Long, List<HadoopyShardSpec>> entry : shardSpecs.entrySet()) {
Assert.assertEquals(
expectedNumOfShards[i++],
entry.getValue().size(),

View File

@ -227,7 +227,9 @@ public class DeterminePartitionsJobTest
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")), null, null),
null,
ImmutableList.of("timestamp", "host", "country", "visited_num")
ImmutableList.of("timestamp", "host", "country", "visited_num"),
false,
0
),
null
),

View File

@ -142,7 +142,9 @@ public class IndexGeneratorJobTest
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
ImmutableList.of("timestamp", "host", "visited_num"),
false,
0
),
null
),
@ -188,7 +190,9 @@ public class IndexGeneratorJobTest
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
ImmutableList.of("timestamp", "host", "visited_num"),
false,
0
)
),
null,
@ -233,7 +237,9 @@ public class IndexGeneratorJobTest
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
ImmutableList.of("timestamp", "host", "visited_num"),
false,
0
),
null
),
@ -289,7 +295,9 @@ public class IndexGeneratorJobTest
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
ImmutableList.of("timestamp", "host", "visited_num"),
false,
0
)
),
null,

View File

@ -77,7 +77,9 @@ public class JobHelperTest
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
ImmutableList.of("timestamp", "host", "visited_num"),
false,
0
),
null
),

View File

@ -265,7 +265,9 @@ public class DatasourcePathSpecTest
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
ImmutableList.of("timestamp", "host", "visited"),
false,
0
),
null
),

View File

@ -164,7 +164,9 @@ public class HadoopConverterJobTest
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TestIndex.DIMENSIONS)), null, null),
"\t",
"\u0001",
Arrays.asList(TestIndex.COLUMNS)
Arrays.asList(TestIndex.COLUMNS),
false,
0
),
null
),

View File

@ -272,6 +272,11 @@ public class IndexTask extends AbstractTask
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
// The null inputRow means the caller must skip this row.
if (inputRow == null) {
continue;
}
final Interval interval;
if (determineIntervals) {
interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
@ -74,6 +75,24 @@ public class IndexTaskTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")),
Lists.newArrayList(),
Lists.newArrayList()
),
null,
Arrays.asList("ts", "dim", "val"),
false,
0
);
private final IndexSpec indexSpec;
private final ObjectMapper jsonMapper;
private IndexMerger indexMerger;
@ -107,7 +126,7 @@ public class IndexTaskTest
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(tmpDir, null, 2, null, false, false),
createIngestionSpec(tmpDir, null, null, 2, null, false, false),
null,
jsonMapper
);
@ -145,7 +164,7 @@ public class IndexTaskTest
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(tmpDir, null, 2, null, true, false),
createIngestionSpec(tmpDir, null, null, 2, null, true, false),
null,
jsonMapper
);
@ -185,6 +204,7 @@ public class IndexTaskTest
null,
createIngestionSpec(
tmpDir,
null,
new ArbitraryGranularitySpec(
Granularities.MINUTE,
Arrays.asList(new Interval("2014/2015"))
@ -220,6 +240,7 @@ public class IndexTaskTest
null,
createIngestionSpec(
tmpDir,
null,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.HOUR,
@ -254,7 +275,7 @@ public class IndexTaskTest
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(tmpDir, null, null, 1, false, false),
createIngestionSpec(tmpDir, null, null, null, 1, false, false),
null,
jsonMapper
);
@ -285,7 +306,7 @@ public class IndexTaskTest
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(tmpDir, null, 2, null, false, true),
createIngestionSpec(tmpDir, null, null, 2, null, false, true),
null,
jsonMapper
);
@ -323,6 +344,7 @@ public class IndexTaskTest
null,
createIngestionSpec(
tmpDir,
null,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
@ -357,6 +379,112 @@ public class IndexTaskTest
Assert.assertEquals(0, segments.get(2).getShardSpec().getPartitionNum());
}
@Test
public void testCSVFileWithHeader() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("time,d,val");
writer.println("2014-01-01T00:00:10Z,a,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(
tmpDir,
new CSVParseSpec(
new TimestampSpec(
"time",
"auto",
null
),
new DimensionsSpec(
null,
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
null,
true,
0
),
null,
2,
null,
false,
false
),
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(1, segments.size());
Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions());
Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics());
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
}
@Test
public void testCSVFileWithHeaderColumnOverride() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("time,d,val");
writer.println("2014-01-01T00:00:10Z,a,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(
tmpDir,
new CSVParseSpec(
new TimestampSpec(
"time",
"auto",
null
),
new DimensionsSpec(
null,
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("time", "dim", "val"),
true,
0
),
null,
2,
null,
false,
false
),
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(1, segments.size());
Assert.assertEquals(Arrays.asList("dim"), segments.get(0).getDimensions());
Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics());
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
}
private final List<DataSegment> runTask(final IndexTask indexTask) throws Exception
{
final List<DataSegment> segments = Lists.newArrayList();
@ -434,6 +562,7 @@ public class IndexTaskTest
private IndexTask.IndexIngestionSpec createIngestionSpec(
File baseDir,
ParseSpec parseSpec,
GranularitySpec granularitySpec,
Integer targetPartitionSize,
Integer numShards,
@ -446,20 +575,7 @@ public class IndexTaskTest
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
),
parseSpec != null ? parseSpec : DEFAULT_PARSE_SPEC,
null
),
Map.class

View File

@ -19,6 +19,7 @@
package io.druid.java.util.common.parsers;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
@ -33,24 +34,17 @@ import java.util.Map;
public class CSVParser implements Parser<String, Object>
{
private final String listDelimiter;
private final Splitter listSplitter;
private final Function<String, Object> valueFunction;
private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser();
private ArrayList<String> fieldNames = null;
public CSVParser(final Optional<String> listDelimiter)
private static final Function<String, Object> getValueFunction(
final String listDelimiter,
final Splitter listSplitter
)
{
this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER;
this.listSplitter = Splitter.on(this.listDelimiter);
this.valueFunction = new Function<String, Object>()
return new Function<String, Object>()
{
@Override
public Object apply(String input)
{
if (input.contains(CSVParser.this.listDelimiter)) {
if (input.contains(listDelimiter)) {
return Lists.newArrayList(
Iterables.transform(
listSplitter.split(input),
@ -64,16 +58,49 @@ public class CSVParser implements Parser<String, Object>
};
}
public CSVParser(final Optional<String> listDelimiter, final Iterable<String> fieldNames)
private final String listDelimiter;
private final Splitter listSplitter;
private final Function<String, Object> valueFunction;
private final boolean hasHeaderRow;
private final int maxSkipHeaderRows;
private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser();
private ArrayList<String> fieldNames = null;
private boolean hasParsedHeader = false;
private int skippedHeaderRows;
private boolean supportSkipHeaderRows;
public CSVParser(
final Optional<String> listDelimiter,
final boolean hasHeaderRow,
final int maxSkipHeaderRows
)
{
this(listDelimiter);
this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER;
this.listSplitter = Splitter.on(this.listDelimiter);
this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter);
this.hasHeaderRow = hasHeaderRow;
this.maxSkipHeaderRows = maxSkipHeaderRows;
}
public CSVParser(
final Optional<String> listDelimiter,
final Iterable<String> fieldNames,
final boolean hasHeaderRow,
final int maxSkipHeaderRows
)
{
this(listDelimiter, hasHeaderRow, maxSkipHeaderRows);
setFieldNames(fieldNames);
}
public CSVParser(final Optional<String> listDelimiter, final String header)
@VisibleForTesting
CSVParser(final Optional<String> listDelimiter, final String header)
{
this(listDelimiter);
this(listDelimiter, false, 0);
setFieldNames(header);
}
@ -83,6 +110,14 @@ public class CSVParser implements Parser<String, Object>
return listDelimiter;
}
@Override
public void startFileFromBeginning()
{
supportSkipHeaderRows = true;
hasParsedHeader = false;
skippedHeaderRows = 0;
}
@Override
public List<String> getFieldNames()
{
@ -92,8 +127,10 @@ public class CSVParser implements Parser<String, Object>
@Override
public void setFieldNames(final Iterable<String> fieldNames)
{
ParserUtils.validateFields(fieldNames);
this.fieldNames = Lists.newArrayList(fieldNames);
if (fieldNames != null) {
ParserUtils.validateFields(fieldNames);
this.fieldNames = Lists.newArrayList(fieldNames);
}
}
public void setFieldNames(final String header)
@ -109,9 +146,27 @@ public class CSVParser implements Parser<String, Object>
@Override
public Map<String, Object> parse(final String input)
{
if (!supportSkipHeaderRows && (hasHeaderRow || maxSkipHeaderRows > 0)) {
throw new UnsupportedOperationException("hasHeaderRow or maxSkipHeaderRows is not supported. "
+ "Please check the indexTask supports these options.");
}
try {
String[] values = parser.parseLine(input);
if (skippedHeaderRows < maxSkipHeaderRows) {
skippedHeaderRows++;
return null;
}
if (hasHeaderRow && !hasParsedHeader) {
if (fieldNames == null) {
setFieldNames(Arrays.asList(values));
}
hasParsedHeader = true;
return null;
}
if (fieldNames == null) {
setFieldNames(ParserUtils.generateFieldNames(values.length));
}

View File

@ -19,6 +19,7 @@
package io.druid.java.util.common.parsers;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@ -36,16 +37,44 @@ public class DelimitedParser implements Parser<String, Object>
{
private static final String DEFAULT_DELIMITER = "\t";
private static Function<String, Object> getValueFunction(
final String listDelimiter,
final Splitter listSplitter
)
{
return (input) -> {
if (input.contains(listDelimiter)) {
return Lists.newArrayList(
Iterables.transform(
listSplitter.split(input),
ParserUtils.nullEmptyStringFunction
)
);
} else {
return ParserUtils.nullEmptyStringFunction.apply(input);
}
};
}
private final String delimiter;
private final String listDelimiter;
private final Splitter splitter;
private final Splitter listSplitter;
private final Function<String, Object> valueFunction;
private final boolean hasHeaderRow;
private final int maxSkipHeaderRows;
private ArrayList<String> fieldNames = null;
private boolean hasParsedHeader = false;
private int skippedHeaderRows;
private boolean supportSkipHeaderRows;
public DelimitedParser(final Optional<String> delimiter, Optional<String> listDelimiter)
public DelimitedParser(
final Optional<String> delimiter,
final Optional<String> listDelimiter,
final boolean hasHeaderRow,
final int maxSkipHeaderRows
)
{
this.delimiter = delimiter.isPresent() ? delimiter.get() : DEFAULT_DELIMITER;
this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER;
@ -58,40 +87,28 @@ public class DelimitedParser implements Parser<String, Object>
this.splitter = Splitter.on(this.delimiter);
this.listSplitter = Splitter.on(this.listDelimiter);
this.valueFunction = new Function<String, Object>()
{
@Override
public Object apply(String input)
{
if (input.contains(DelimitedParser.this.listDelimiter)) {
return Lists.newArrayList(
Iterables.transform(
listSplitter.split(input),
ParserUtils.nullEmptyStringFunction
)
);
} else {
return ParserUtils.nullEmptyStringFunction.apply(input);
}
}
};
this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter);
this.hasHeaderRow = hasHeaderRow;
this.maxSkipHeaderRows = maxSkipHeaderRows;
}
public DelimitedParser(
final Optional<String> delimiter,
final Optional<String> listDelimiter,
final Iterable<String> fieldNames
final Iterable<String> fieldNames,
final boolean hasHeaderRow,
final int maxSkipHeaderRows
)
{
this(delimiter, listDelimiter);
this(delimiter, listDelimiter, hasHeaderRow, maxSkipHeaderRows);
setFieldNames(fieldNames);
}
public DelimitedParser(final Optional<String> delimiter, final Optional<String> listDelimiter, final String header)
@VisibleForTesting
DelimitedParser(final Optional<String> delimiter, final Optional<String> listDelimiter, final String header)
{
this(delimiter, listDelimiter);
this(delimiter, listDelimiter, false, 0);
setFieldNames(header);
}
@ -106,6 +123,14 @@ public class DelimitedParser implements Parser<String, Object>
return listDelimiter;
}
@Override
public void startFileFromBeginning()
{
supportSkipHeaderRows = true;
hasParsedHeader = false;
skippedHeaderRows = 0;
}
@Override
public List<String> getFieldNames()
{
@ -115,8 +140,10 @@ public class DelimitedParser implements Parser<String, Object>
@Override
public void setFieldNames(final Iterable<String> fieldNames)
{
ParserUtils.validateFields(fieldNames);
this.fieldNames = Lists.newArrayList(fieldNames);
if (fieldNames != null) {
ParserUtils.validateFields(fieldNames);
this.fieldNames = Lists.newArrayList(fieldNames);
}
}
public void setFieldNames(String header)
@ -132,9 +159,27 @@ public class DelimitedParser implements Parser<String, Object>
@Override
public Map<String, Object> parse(final String input)
{
if (!supportSkipHeaderRows && (hasHeaderRow || maxSkipHeaderRows > 0)) {
throw new UnsupportedOperationException("hasHeaderRow or maxSkipHeaderRows is not supported. "
+ "Please check the indexTask supports these options.");
}
try {
Iterable<String> values = splitter.split(input);
if (skippedHeaderRows < maxSkipHeaderRows) {
skippedHeaderRows++;
return null;
}
if (hasHeaderRow && !hasParsedHeader) {
if (fieldNames == null) {
setFieldNames(values);
}
hasParsedHeader = true;
return null;
}
if (fieldNames == null) {
setFieldNames(ParserUtils.generateFieldNames(Iterators.size(values.iterator())));
}

View File

@ -238,7 +238,7 @@ public class JSONPathParser implements Parser<String, Object>
/**
* Specifies a field to be added to the parsed object Map, using JsonPath notation.
*
* <p>
* See <a href="https://github.com/jayway/JsonPath">https://github.com/jayway/JsonPath</a> for more information.
*/
public static class FieldSpec
@ -281,5 +281,4 @@ public class JSONPathParser implements Parser<String, Object>
return expr;
}
}
}

View File

@ -19,6 +19,7 @@
package io.druid.java.util.common.parsers;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@ -28,11 +29,21 @@ import java.util.Map;
public interface Parser<K, V>
{
/**
* Parse a String into a Map.
* This method may or may not get called at the start of reading of every file depending on the type of IndexTasks.
* The parser state should be reset if exists.
*/
default void startFileFromBeginning()
{
}
/**
* Parse a String into a Map. The result can be null which means the given input string will be ignored.
*
* @throws ParseException if the String cannot be parsed
*/
public Map<K, V> parse(String input);
@Nullable
Map<K, V> parse(String input);
/**
* Set the fieldNames that you expect to see in parsed Maps. Deprecated; Parsers should not, in general, be
@ -40,12 +51,12 @@ public interface Parser<K, V>
* parser) and those parsers have their own way of setting field names.
*/
@Deprecated
public void setFieldNames(Iterable<String> fieldNames);
void setFieldNames(Iterable<String> fieldNames);
/**
* Returns the fieldNames that we expect to see in parsed Maps, if known, or null otherwise. Deprecated; Parsers
* should not, in general, be expected to know what fields they will return.
*/
@Deprecated
public List<String> getFieldNames();
List<String> getFieldNames();
}

View File

@ -117,7 +117,6 @@ public class RegexParser implements Parser<String, Object>
}
@Override
public List<String> getFieldNames()
{
return fieldNames;

View File

@ -53,6 +53,12 @@ public class ToLowerCaseParser implements Parser<String, Object>
return retVal;
}
@Override
public void startFileFromBeginning()
{
baseParser.startFileFromBeginning();
}
@Override
public void setFieldNames(Iterable<String> fieldNames)
{

View File

@ -21,7 +21,7 @@ package io.druid.java.util.common.parsers;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
@ -80,7 +80,7 @@ public class CSVParserTest
@Test
public void testCSVParserWithoutHeader()
{
final Parser<String, Object> csvParser = new CSVParser(Optional.<String>fromNullable(null));
final Parser<String, Object> csvParser = new CSVParser(Optional.<String>fromNullable(null), false, 0);
String body = "hello,world,foo";
final Map<String, Object> jsonMap = csvParser.parse(body);
Assert.assertEquals(
@ -89,4 +89,48 @@ public class CSVParserTest
jsonMap
);
}
@Test
public void testCSVParserWithSkipHeaderRows()
{
final int skipHeaderRows = 2;
final Parser<String, Object> csvParser = new CSVParser(
Optional.absent(),
false,
skipHeaderRows
);
csvParser.startFileFromBeginning();
final String[] body = new String[] {
"header,line,1",
"header,line,2",
"hello,world,foo"
};
int index;
for (index = 0; index < skipHeaderRows; index++) {
Assert.assertNull(csvParser.parse(body[index]));
}
final Map<String, Object> jsonMap = csvParser.parse(body[index]);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"),
jsonMap
);
}
@Test(expected = UnsupportedOperationException.class)
public void testCSVParserWithoutStartFileFromBeginning()
{
final int skipHeaderRows = 2;
final Parser<String, Object> csvParser = new CSVParser(
Optional.absent(),
false,
skipHeaderRows
);
final String[] body = new String[] {
"header\tline\t1",
"header\tline\t2",
"hello\tworld\tfoo"
};
csvParser.parse(body[0]);
}
}

View File

@ -21,7 +21,7 @@ package io.druid.java.util.common.parsers;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
@ -67,7 +67,11 @@ public class DelimitedParserTest
public void testTSVParserWithHeader()
{
String header = "time\tvalue1\tvalue2";
final Parser<String, Object> delimitedParser = new DelimitedParser(Optional.of("\t"), Optional.<String>absent(), header);
final Parser<String, Object> delimitedParser = new DelimitedParser(
Optional.of("\t"),
Optional.<String>absent(),
header
);
String body = "hello\tworld\tfoo";
final Map<String, Object> jsonMap = delimitedParser.parse(body);
Assert.assertEquals(
@ -80,7 +84,12 @@ public class DelimitedParserTest
@Test
public void testTSVParserWithoutHeader()
{
final Parser<String, Object> delimitedParser = new DelimitedParser(Optional.of("\t"), Optional.<String>absent());
final Parser<String, Object> delimitedParser = new DelimitedParser(
Optional.of("\t"),
Optional.<String>absent(),
false,
0
);
String body = "hello\tworld\tfoo";
final Map<String, Object> jsonMap = delimitedParser.parse(body);
Assert.assertEquals(
@ -89,4 +98,50 @@ public class DelimitedParserTest
jsonMap
);
}
@Test
public void testTSVParserWithSkipHeaderRows()
{
final int skipHeaderRows = 2;
final Parser<String, Object> delimitedParser = new DelimitedParser(
Optional.of("\t"),
Optional.absent(),
false,
skipHeaderRows
);
delimitedParser.startFileFromBeginning();
final String[] body = new String[] {
"header\tline\t1",
"header\tline\t2",
"hello\tworld\tfoo"
};
int index;
for (index = 0; index < skipHeaderRows; index++) {
Assert.assertNull(delimitedParser.parse(body[index]));
}
final Map<String, Object> jsonMap = delimitedParser.parse(body[index]);
Assert.assertEquals(
"jsonMap",
ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"),
jsonMap
);
}
@Test(expected = UnsupportedOperationException.class)
public void testTSVParserWithoutStartFileFromBeginning()
{
final int skipHeaderRows = 2;
final Parser<String, Object> delimitedParser = new DelimitedParser(
Optional.of("\t"),
Optional.absent(),
false,
skipHeaderRows
);
final String[] body = new String[] {
"header\tline\t1",
"header\tline\t2",
"hello\tworld\tfoo"
};
delimitedParser.parse(body[0]);
}
}

View File

@ -130,7 +130,9 @@ public class MultiValuedDimensionTest
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null),
"\t",
ImmutableList.of("timestamp", "product", "tags")
ImmutableList.of("timestamp", "product", "tags"),
false,
0
),
"UTF-8"
);

View File

@ -146,7 +146,9 @@ public class GroupByQueryRunnerFactoryTest
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null),
"\t",
ImmutableList.of("timestamp", "product", "tags")
ImmutableList.of("timestamp", "product", "tags"),
false,
0
),
"UTF-8"
);

View File

@ -291,7 +291,9 @@ public class TestIndex
new DimensionsSpec(DIMENSION_SCHEMAS, null, null),
"\t",
"\u0001",
Arrays.asList(COLUMNS)
Arrays.asList(COLUMNS),
false,
0
)
, "utf8"
);

View File

@ -162,6 +162,16 @@ public class ReplayableFirehoseFactory implements FirehoseFactory<InputRowParser
while (delegateFirehose.hasMore() && cos.getCount() < getMaxTempFileSize()) {
try {
InputRow row = delegateFirehose.nextRow();
// delegateFirehose may return a null row if the underlying parser returns null.
// This should be written, but deserialization of null values causes an error of
// 'com.fasterxml.jackson.databind.RuntimeJsonMappingException: Can not deserialize instance of io.druid.data.input.MapBasedRow out of VALUE_NULL token'
// Since ReplayableFirehoseFactory is planed to be removed in https://github.com/druid-io/druid/pull/4193,
// we simply skip null rows for now.
if (row == null) {
continue;
}
generator.writeObject(row);
dimensionScratch.addAll(row.getDimensions());
counter++;

View File

@ -108,7 +108,9 @@ public class IngestSegmentFirehoseTest
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
ImmutableList.of("timestamp", "host", "visited"),
false,
0
),
Charsets.UTF_8.toString()
);