diff --git a/api/src/main/java/io/druid/data/input/Firehose.java b/api/src/main/java/io/druid/data/input/Firehose.java index a768e778d81..4f4c640f104 100644 --- a/api/src/main/java/io/druid/data/input/Firehose.java +++ b/api/src/main/java/io/druid/data/input/Firehose.java @@ -19,6 +19,7 @@ package io.druid.data.input; +import javax.annotation.Nullable; import java.io.Closeable; /** @@ -46,14 +47,16 @@ public interface Firehose extends Closeable * * @return true if and when there is another row available, false if the stream has dried up */ - public boolean hasMore(); + boolean hasMore(); /** * The next row available. Should only be called if hasMore returns true. + * The return value can be null which means the caller must skip this row. * * @return The next row */ - public InputRow nextRow(); + @Nullable + InputRow nextRow(); /** * Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is @@ -74,5 +77,5 @@ public interface Firehose extends Closeable * because of InputRows delivered by prior calls to ##nextRow(). *

*/ - public Runnable commit(); + Runnable commit(); } diff --git a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java index bbe1fc4d228..a4b09a2010a 100644 --- a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.base.Preconditions; - import io.druid.java.util.common.parsers.CSVParser; import io.druid.java.util.common.parsers.Parser; @@ -35,26 +34,38 @@ public class CSVParseSpec extends ParseSpec { private final String listDelimiter; private final List columns; + private final boolean hasHeaderRow; + private final int skipHeaderRows; @JsonCreator public CSVParseSpec( @JsonProperty("timestampSpec") TimestampSpec timestampSpec, @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, @JsonProperty("listDelimiter") String listDelimiter, - @JsonProperty("columns") List columns + @JsonProperty("columns") List columns, + @JsonProperty("hasHeaderRow") boolean hasHeaderRow, + @JsonProperty("skipHeaderRows") int skipHeaderRows ) { super(timestampSpec, dimensionsSpec); this.listDelimiter = listDelimiter; - Preconditions.checkNotNull(columns, "columns"); - for (String column : columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); - } - this.columns = columns; + this.hasHeaderRow = hasHeaderRow; + this.skipHeaderRows = skipHeaderRows; - verify(dimensionsSpec.getDimensionNames()); + if (columns != null) { + for (String column : columns) { + Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + } + verify(dimensionsSpec.getDimensionNames()); + } else { + Preconditions.checkArgument( + hasHeaderRow, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); + } } @JsonProperty @@ -69,6 +80,18 @@ public class CSVParseSpec extends ParseSpec return columns; } + @JsonProperty + public boolean isHasHeaderRow() + { + return hasHeaderRow; + } + + @JsonProperty("skipHeaderRows") + public int getSkipHeaderRows() + { + return skipHeaderRows; + } + @Override public void verify(List usedCols) { @@ -80,23 +103,23 @@ public class CSVParseSpec extends ParseSpec @Override public Parser makeParser() { - return new CSVParser(Optional.fromNullable(listDelimiter), columns); + return new CSVParser(Optional.fromNullable(listDelimiter), columns, hasHeaderRow, skipHeaderRows); } @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { - return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns); + return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns, hasHeaderRow, skipHeaderRows); } @Override public ParseSpec withDimensionsSpec(DimensionsSpec spec) { - return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns); + return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns, hasHeaderRow, skipHeaderRows); } public ParseSpec withColumns(List cols) { - return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols); + return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols, hasHeaderRow, skipHeaderRows); } } diff --git a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java index 6d7096d7921..c3383eb351c 100644 --- a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.base.Preconditions; - import io.druid.java.util.common.parsers.DelimitedParser; import io.druid.java.util.common.parsers.Parser; @@ -36,6 +35,8 @@ public class DelimitedParseSpec extends ParseSpec private final String delimiter; private final String listDelimiter; private final List columns; + private final boolean hasHeaderRow; + private final int skipHeaderRows; @JsonCreator public DelimitedParseSpec( @@ -43,20 +44,31 @@ public class DelimitedParseSpec extends ParseSpec @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, @JsonProperty("delimiter") String delimiter, @JsonProperty("listDelimiter") String listDelimiter, - @JsonProperty("columns") List columns + @JsonProperty("columns") List columns, + @JsonProperty("hasHeaderRow") boolean hasHeaderRow, + @JsonProperty("skipHeaderRows") int skipHeaderRows ) { super(timestampSpec, dimensionsSpec); this.delimiter = delimiter; this.listDelimiter = listDelimiter; - Preconditions.checkNotNull(columns, "columns"); this.columns = columns; - for (String column : this.columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); - } + this.hasHeaderRow = hasHeaderRow; + this.skipHeaderRows = skipHeaderRows; - verify(dimensionsSpec.getDimensionNames()); + if (columns != null) { + for (String column : this.columns) { + Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + } + verify(dimensionsSpec.getDimensionNames()); + } else { + Preconditions.checkArgument( + hasHeaderRow, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); + } } @JsonProperty("delimiter") @@ -77,6 +89,18 @@ public class DelimitedParseSpec extends ParseSpec return columns; } + @JsonProperty + public boolean isHasHeaderRow() + { + return hasHeaderRow; + } + + @JsonProperty("skipHeaderRows") + public int getSkipHeaderRows() + { + return skipHeaderRows; + } + @Override public void verify(List usedCols) { @@ -88,38 +112,79 @@ public class DelimitedParseSpec extends ParseSpec @Override public Parser makeParser() { - Parser retVal = new DelimitedParser( + return new DelimitedParser( Optional.fromNullable(delimiter), - Optional.fromNullable(listDelimiter) + Optional.fromNullable(listDelimiter), + columns, + hasHeaderRow, + skipHeaderRows ); - retVal.setFieldNames(columns); - return retVal; } @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { - return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns); + return new DelimitedParseSpec( + spec, + getDimensionsSpec(), + delimiter, + listDelimiter, + columns, + hasHeaderRow, + skipHeaderRows + ); } @Override public ParseSpec withDimensionsSpec(DimensionsSpec spec) { - return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns); + return new DelimitedParseSpec( + getTimestampSpec(), + spec, + delimiter, + listDelimiter, + columns, + hasHeaderRow, + skipHeaderRows + ); } public ParseSpec withDelimiter(String delim) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delim, listDelimiter, columns); + return new DelimitedParseSpec( + getTimestampSpec(), + getDimensionsSpec(), + delim, + listDelimiter, + columns, + hasHeaderRow, + skipHeaderRows + ); } public ParseSpec withListDelimiter(String delim) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns); + return new DelimitedParseSpec( + getTimestampSpec(), + getDimensionsSpec(), + delimiter, + delim, + columns, + hasHeaderRow, + skipHeaderRows + ); } public ParseSpec withColumns(List cols) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, listDelimiter, cols); + return new DelimitedParseSpec( + getTimestampSpec(), + getDimensionsSpec(), + delimiter, + listDelimiter, + cols, + hasHeaderRow, + skipHeaderRows + ); } } diff --git a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java index 97e33f04a89..648831bece3 100644 --- a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java +++ b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java @@ -19,7 +19,6 @@ package io.druid.data.input.impl; -import com.google.common.base.Throwables; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; import io.druid.utils.Runnables; @@ -27,6 +26,7 @@ import org.apache.commons.io.LineIterator; import java.io.IOException; import java.util.Iterator; +import java.util.NoSuchElementException; /** */ @@ -50,7 +50,7 @@ public class FileIteratingFirehose implements Firehose public boolean hasMore() { while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) { - lineIterator = lineIterators.next(); + lineIterator = getNextLineIterator(); } return lineIterator != null && lineIterator.hasNext(); @@ -59,21 +59,22 @@ public class FileIteratingFirehose implements Firehose @Override public InputRow nextRow() { - try { - if (lineIterator == null || !lineIterator.hasNext()) { - // Close old streams, maybe. - if (lineIterator != null) { - lineIterator.close(); - } - - lineIterator = lineIterators.next(); - } - - return parser.parse(lineIterator.next()); + if (!hasMore()) { + throw new NoSuchElementException(); } - catch (Exception e) { - throw Throwables.propagate(e); + + return parser.parse(lineIterator.next()); + } + + private LineIterator getNextLineIterator() + { + if (lineIterator != null) { + lineIterator.close(); } + + final LineIterator iterator = lineIterators.next(); + parser.startFileFromBeginning(); + return iterator; } @Override diff --git a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java index 6a13fcb7bd1..a640ef10ac4 100644 --- a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java +++ b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java @@ -22,12 +22,12 @@ package io.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Charsets; - import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.InputRow; import io.druid.java.util.common.parsers.ParseException; import io.druid.java.util.common.parsers.Parser; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; @@ -124,18 +124,30 @@ public class StringInputRowParser implements ByteBufferInputRowParser return theMap; } - private Map parseString(String inputString) + public void startFileFromBeginning() { - return parser.parse(inputString); + parser.startFileFromBeginning(); } - public InputRow parse(String input) + @Nullable + public InputRow parse(@Nullable String input) { return parseMap(parseString(input)); } - private InputRow parseMap(Map theMap) + @Nullable + private Map parseString(@Nullable String inputString) { + return parser.parse(inputString); + } + + @Nullable + private InputRow parseMap(@Nullable Map theMap) + { + // If a header is present in the data (and with proper configurations), a null is returned + if (theMap == null) { + return null; + } return mapParser.parse(theMap); } } diff --git a/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java b/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java index e6740cb63f4..ec1fdb2a040 100644 --- a/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java @@ -21,7 +21,6 @@ package io.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import io.druid.java.util.common.parsers.Parser; import java.util.List; diff --git a/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java index 7d99a2b804d..e9143344c64 100644 --- a/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java @@ -41,7 +41,9 @@ public class CSVParseSpecTest Lists.newArrayList() ), ",", - Arrays.asList("a") + Arrays.asList("a"), + false, + 0 ); } @@ -60,7 +62,9 @@ public class CSVParseSpecTest Lists.newArrayList() ), ",", - Arrays.asList("a") + Arrays.asList("a"), + false, + 0 ); } } diff --git a/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java index 1ebad8c6414..e94ba373fa6 100644 --- a/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java @@ -40,7 +40,9 @@ public class DelimitedParseSpecTest new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("abc")), null, null), "\u0001", "\u0002", - Arrays.asList("abc") + Arrays.asList("abc"), + false, + 0 ); final DelimitedParseSpec serde = jsonMapper.readValue( jsonMapper.writeValueAsString(spec), @@ -71,7 +73,9 @@ public class DelimitedParseSpecTest ), ",", " ", - Arrays.asList("a") + Arrays.asList("a"), + false, + 0 ); } @@ -91,12 +95,15 @@ public class DelimitedParseSpecTest ), ",", null, - Arrays.asList("a") + Arrays.asList("a"), + false, + 0 ); } - @Test(expected = NullPointerException.class) - public void testDefaultColumnList(){ + @Test(expected = IllegalArgumentException.class) + public void testDefaultColumnList() + { final DelimitedParseSpec spec = new DelimitedParseSpec( new TimestampSpec( "timestamp", @@ -110,8 +117,9 @@ public class DelimitedParseSpecTest ), ",", null, - // pass null columns not allowed - null + null, + false, + 0 ); } } diff --git a/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java b/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java index 87f5d20fcd5..30239233bcb 100644 --- a/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java +++ b/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java @@ -19,67 +19,107 @@ package io.druid.data.input.impl; -import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; - -import io.druid.java.util.common.Pair; -import junit.framework.Assert; - +import io.druid.data.input.InputRow; import org.apache.commons.io.LineIterator; +import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import java.io.IOException; import java.io.StringReader; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +@RunWith(Parameterized.class) public class FileIteratingFirehoseTest { - private static final List>> fixtures = ImmutableList.of( - Pair.of(new String[]{"2000,foo"}, ImmutableList.of("foo")), - Pair.of(new String[]{"2000,foo\n2000,bar\n"}, ImmutableList.of("foo", "bar")), - Pair.of(new String[]{"2000,foo\n2000,bar\n", "2000,baz"}, ImmutableList.of("foo", "bar", "baz")), - Pair.of(new String[]{"2000,foo\n2000,bar\n", "", "2000,baz"}, ImmutableList.of("foo", "bar", "baz")), - Pair.of(new String[]{"2000,foo\n2000,bar\n", "", "2000,baz", ""}, ImmutableList.of("foo", "bar", "baz")), - Pair.of(new String[]{""}, ImmutableList.of()), - Pair.of(new String[]{}, ImmutableList.of()) - ); + @Parameters(name = "{0}, {1}") + public static Collection constructorFeeder() throws IOException + { + final List> inputTexts = ImmutableList.of( + ImmutableList.of("2000,foo"), + ImmutableList.of("2000,foo\n2000,bar\n"), + ImmutableList.of("2000,foo\n2000,bar\n", "2000,baz"), + ImmutableList.of("2000,foo\n2000,bar\n", "", "2000,baz"), + ImmutableList.of("2000,foo\n2000,bar\n", "", "2000,baz", ""), + ImmutableList.of("2000,foo\n2000,bar\n2000,baz", "", "2000,baz", "2000,foo\n2000,bar\n3000,baz"), + ImmutableList.of(""), + ImmutableList.of() + ); + + final List args = new ArrayList<>(); + for (int numSkipHeadRows = 0; numSkipHeadRows < 3; numSkipHeadRows++) { + for (List texts : inputTexts) { + args.add(new Object[] {texts, numSkipHeadRows}); + } + } + + return args; + } + + private final StringInputRowParser parser; + private final List inputs; + private final List expectedResults; + + public FileIteratingFirehoseTest(List texts, int numSkipHeaderRows) + { + parser = new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null), + ",", + ImmutableList.of("ts", "x"), + false, + numSkipHeaderRows + ), + null + ); + + this.inputs = texts; + this.expectedResults = inputs.stream() + .map(input -> input.split("\n")) + .flatMap(lines -> { + final List filteredLines = Arrays.asList(lines).stream() + .filter(line -> line.length() > 0) + .map(line -> line.split(",")[1]) + .collect(Collectors.toList()); + + final int numRealSkippedRows = Math.min(filteredLines.size(), numSkipHeaderRows); + IntStream.range(0, numRealSkippedRows).forEach(i -> filteredLines.set(i, null)); + return filteredLines.stream(); + }) + .collect(Collectors.toList()); + } @Test public void testFirehose() throws Exception { - for (Pair> fixture : fixtures) { - final List lineIterators = Lists.transform( - Arrays.asList(fixture.lhs), - new Function() - { - @Override - public LineIterator apply(String s) - { - return new LineIterator(new StringReader(s)); - } - } - ); + final List lineIterators = inputs.stream() + .map(s -> new LineIterator(new StringReader(s))) + .collect(Collectors.toList()); - final StringInputRowParser parser = new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec("ts", "auto", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null), - ",", - ImmutableList.of("ts", "x") - ), - null - ); - - final FileIteratingFirehose firehose = new FileIteratingFirehose(lineIterators.iterator(), parser); + try (final FileIteratingFirehose firehose = new FileIteratingFirehose(lineIterators.iterator(), parser)) { final List results = Lists.newArrayList(); while (firehose.hasMore()) { - results.add(Joiner.on("|").join(firehose.nextRow().getDimension("x"))); + final InputRow inputRow = firehose.nextRow(); + if (inputRow == null) { + results.add(null); + } else { + results.add(Joiner.on("|").join(inputRow.getDimension("x"))); + } } - Assert.assertEquals(fixture.rhs, results); + Assert.assertEquals(expectedResults, results); } } } diff --git a/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java index 4b38453fe35..01da54cd0fd 100644 --- a/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java @@ -45,7 +45,9 @@ public class ParseSpecTest ), ",", " ", - Arrays.asList("a", "b") + Arrays.asList("a", "b"), + false, + 0 ); } @@ -65,7 +67,9 @@ public class ParseSpecTest ), ",", null, - Arrays.asList("a", "B") + Arrays.asList("a", "B"), + false, + 0 ); } @@ -85,7 +89,9 @@ public class ParseSpecTest ), ",", null, - Arrays.asList("a", "B") + Arrays.asList("a", "B"), + false, + 0 ); } } diff --git a/docs/content/development/extensions-core/lookups-cached-global.md b/docs/content/development/extensions-core/lookups-cached-global.md index 72d346201c1..385cc07ad7c 100644 --- a/docs/content/development/extensions-core/lookups-cached-global.md +++ b/docs/content/development/extensions-core/lookups-cached-global.md @@ -195,12 +195,17 @@ The `namespaceParseSpec` can be one of a number of values. Each of the examples Only ONE file which matches the search will be used. For most implementations, the discriminator for choosing the URIs is by whichever one reports the most recent timestamp for its modification time. ### csv lookupParseSpec - |Parameter|Description|Required|Default| |---------|-----------|--------|-------| -|`columns`|The list of columns in the csv file|yes|`null`| +|`columns`|The list of columns in the csv file|no if `hasHeaderRow` is set|`null`| |`keyColumn`|The name of the column containing the key|no|The first column| |`valueColumn`|The name of the column containing the value|no|The second column| +|`hasHeaderRow`|A flag to indicate that column information can be extracted from the input files' header row|no|false| +|`skipHeaderRows`|Number of header rows to be skipped|no|0| + +If both `skipHeaderRows` and `hasHeaderRow` options are set, `skipHeaderRows` is first applied. For example, if you set +`skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will skip the first two lines and then extract column information +from the third line. *example input* @@ -222,15 +227,19 @@ truck,something3,buck ``` ### tsv lookupParseSpec - |Parameter|Description|Required|Default| |---------|-----------|--------|-------| -|`columns`|The list of columns in the csv file|yes|`null`| +|`columns`|The list of columns in the tsv file|yes|`null`| |`keyColumn`|The name of the column containing the key|no|The first column| |`valueColumn`|The name of the column containing the value|no|The second column| |`delimiter`|The delimiter in the file|no|tab (`\t`)| |`listDelimiter`|The list delimiter in the file|no| (`\u0001`)| +|`hasHeaderRow`|A flag to indicate that column information can be extracted from the input files' header row|no|false| +|`skipHeaderRows`|Number of header rows to be skipped|no|0| +If both `skipHeaderRows` and `hasHeaderRow` options are set, `skipHeaderRows` is first applied. For example, if you set +`skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will skip the first two lines and then extract column information +from the third line. *example input* diff --git a/docs/content/ingestion/data-formats.md b/docs/content/ingestion/data-formats.md index 95807cc60a6..859f75e12be 100644 --- a/docs/content/ingestion/data-formats.md +++ b/docs/content/ingestion/data-formats.md @@ -72,10 +72,8 @@ If you have nested JSON, [Druid can automatically flatten it for you](flatten-js ### CSV -Since the CSV data cannot contain the column names (no header is allowed), these must be added before that data can be processed: - ```json - "parseSpec":{ + "parseSpec": { "format" : "csv", "timestampSpec" : { "column" : "timestamp" @@ -87,12 +85,27 @@ Since the CSV data cannot contain the column names (no header is allowed), these } ``` -The `columns` field must match the columns of your input data in the same order. +#### CSV Index Tasks -### TSV +If your input files contain a header, the `columns` field is optional and you don't need to set. +Instead, you can set the `hasHeaderRow` field to true, which makes Druid automatically extract the column information from the header. +Otherwise, you must set the `columns` field and ensure that field must match the columns of your input data in the same order. + +Also, you can skip some header rows by setting `skipHeaderRows` in your parseSpec. If both `skipHeaderRows` and `hasHeaderRow` options are set, +`skipHeaderRows` is first applied. For example, if you set `skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will +skip the first two lines and then extract column information from the third line. + +Note that `hasHeaderRow` and `skipHeaderRows` are effective only for non-Hadoop batch index tasks. Other types of index +tasks will fail with an exception. + +#### Other CSV Ingestion Tasks + +The `columns` field must be included and and ensure that the order of the fields matches the columns of your input data in the same order. + +### TSV (Delimited) ```json - "parseSpec":{ + "parseSpec": { "format" : "tsv", "timestampSpec" : { "column" : "timestamp" @@ -105,10 +118,25 @@ The `columns` field must match the columns of your input data in the same order. } ``` -The `columns` field must match the columns of your input data in the same order. - Be sure to change the `delimiter` to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed. +#### TSV (Delimited) Index Tasks + +If your input files contain a header, the `columns` field is optional and you don't need to set. +Instead, you can set the `hasHeaderRow` field to true, which makes Druid automatically extract the column information from the header. +Otherwise, you must set the `columns` field and ensure that field must match the columns of your input data in the same order. + +Also, you can skip some header rows by setting `skipHeaderRows` in your parseSpec. If both `skipHeaderRows` and `hasHeaderRow` options are set, +`skipHeaderRows` is first applied. For example, if you set `skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will +skip the first two lines and then extract column information from the third line. + +Note that `hasHeaderRow` and `skipHeaderRows` are effective only for non-Hadoop batch index tasks. Other types of index +tasks will fail with an exception. + +#### Other TSV (Delimited) Ingestion Tasks + +The `columns` field must be included and and ensure that the order of the fields matches the columns of your input data in the same order. + ### Regex ```json diff --git a/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java b/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java index 46d99cbbcc9..d9a9966dce2 100644 --- a/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java +++ b/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java @@ -96,7 +96,9 @@ public class MapVirtualColumnTest new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("dim", "keys", "values")), null, null), "\t", ",", - Arrays.asList("ts", "dim", "keys", "values") + Arrays.asList("ts", "dim", "keys", "values"), + false, + 0 ) , "utf8" ); diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java index a7c27c66638..1c6bfd0447e 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java @@ -27,13 +27,13 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; - import io.druid.guice.annotations.Json; import io.druid.java.util.common.IAE; import io.druid.java.util.common.UOE; @@ -41,7 +41,6 @@ import io.druid.java.util.common.parsers.CSVParser; import io.druid.java.util.common.parsers.DelimitedParser; import io.druid.java.util.common.parsers.JSONParser; import io.druid.java.util.common.parsers.Parser; - import org.joda.time.Period; import javax.annotation.Nullable; @@ -264,7 +263,9 @@ public class URIExtractionNamespace implements ExtractionNamespace public CSVFlatDataParser( @JsonProperty("columns") List columns, @JsonProperty("keyColumn") final String keyColumn, - @JsonProperty("valueColumn") final String valueColumn + @JsonProperty("valueColumn") final String valueColumn, + @JsonProperty("hasHeaderRow") boolean hasHeaderRow, + @JsonProperty("skipHeaderRows") int skipHeaderRows ) { Preconditions.checkArgument( @@ -293,12 +294,22 @@ public class URIExtractionNamespace implements ExtractionNamespace ); this.parser = new DelegateParser( - new CSVParser(Optional.absent(), columns), + new CSVParser(Optional.absent(), columns, hasHeaderRow, skipHeaderRows), this.keyColumn, this.valueColumn ); } + @VisibleForTesting + CSVFlatDataParser( + List columns, + String keyColumn, + String valueColumn + ) + { + this(columns, keyColumn, valueColumn, false, 0); + } + @JsonProperty public List getColumns() { @@ -373,7 +384,9 @@ public class URIExtractionNamespace implements ExtractionNamespace @JsonProperty("delimiter") String delimiter, @JsonProperty("listDelimiter") String listDelimiter, @JsonProperty("keyColumn") final String keyColumn, - @JsonProperty("valueColumn") final String valueColumn + @JsonProperty("valueColumn") final String valueColumn, + @JsonProperty("hasHeaderRow") boolean hasHeaderRow, + @JsonProperty("skipHeaderRows") int skipHeaderRows ) { Preconditions.checkArgument( @@ -382,7 +395,9 @@ public class URIExtractionNamespace implements ExtractionNamespace ); final DelimitedParser delegate = new DelimitedParser( Optional.fromNullable(Strings.emptyToNull(delimiter)), - Optional.fromNullable(Strings.emptyToNull(listDelimiter)) + Optional.fromNullable(Strings.emptyToNull(listDelimiter)), + hasHeaderRow, + skipHeaderRows ); Preconditions.checkArgument( !(Strings.isNullOrEmpty(keyColumn) ^ Strings.isNullOrEmpty(valueColumn)), @@ -410,6 +425,18 @@ public class URIExtractionNamespace implements ExtractionNamespace this.parser = new DelegateParser(delegate, this.keyColumn, this.valueColumn); } + @VisibleForTesting + TSVFlatDataParser( + List columns, + String delimiter, + String listDelimiter, + String keyColumn, + String valueColumn + ) + { + this(columns, delimiter, listDelimiter, keyColumn, valueColumn, false, 0); + } + @JsonProperty public List getColumns() { diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 7201bd11913..2746dd9c004 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -346,7 +346,9 @@ public class BatchDeltaIngestionTest new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "host2", "visited_num") + ImmutableList.of("timestamp", "host", "host2", "visited_num"), + false, + 0 ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index dedd40f8064..804d162c56a 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -54,7 +54,8 @@ public class DetermineHashedPartitionsJobTest private int errorMargin; @Parameterized.Parameters(name = "File={0}, TargetPartitionSize={1}, Interval={2}, ErrorMargin={3}, NumTimeBuckets={4}, NumShards={5}") - public static Collection data(){ + public static Collection data() + { int[] first = new int[1]; Arrays.fill(first, 13); int[] second = new int[6]; @@ -116,7 +117,12 @@ public class DetermineHashedPartitionsJobTest new DelimitedParseSpec( new TimestampSpec("ts", null, null), new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(ImmutableList.of("market", "quality", "placement", "placementish")), + DimensionsSpec.getDefaultSchemas(ImmutableList.of( + "market", + "quality", + "placement", + "placementish" + )), null, null ), @@ -129,7 +135,9 @@ public class DetermineHashedPartitionsJobTest "placement", "placementish", "index" - ) + ), + false, + 0 ), null ), @@ -176,7 +184,8 @@ public class DetermineHashedPartitionsJobTest } @Test - public void testDetermineHashedPartitions(){ + public void testDetermineHashedPartitions() + { DetermineHashedPartitionsJob determineHashedPartitionsJob = new DetermineHashedPartitionsJob(indexerConfig); determineHashedPartitionsJob.run(); Map> shardSpecs = indexerConfig.getSchema().getTuningConfig().getShardSpecs(); @@ -184,8 +193,8 @@ public class DetermineHashedPartitionsJobTest expectedNumTimeBuckets, shardSpecs.entrySet().size() ); - int i=0; - for(Map.Entry> entry : shardSpecs.entrySet()) { + int i = 0; + for (Map.Entry> entry : shardSpecs.entrySet()) { Assert.assertEquals( expectedNumOfShards[i++], entry.getValue().size(), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index 9fc2e8eb040..bf3c1c8713f 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -227,7 +227,9 @@ public class DeterminePartitionsJobTest new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")), null, null), null, - ImmutableList.of("timestamp", "host", "country", "visited_num") + ImmutableList.of("timestamp", "host", "country", "visited_num"), + false, + 0 ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 250b9dd8b04..07edcd9e32a 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -142,7 +142,9 @@ public class IndexGeneratorJobTest new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false, + 0 ), null ), @@ -188,7 +190,9 @@ public class IndexGeneratorJobTest new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false, + 0 ) ), null, @@ -233,7 +237,9 @@ public class IndexGeneratorJobTest new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false, + 0 ), null ), @@ -289,7 +295,9 @@ public class IndexGeneratorJobTest new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false, + 0 ) ), null, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 8af6c470ff6..17bda8f4bcb 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -77,7 +77,9 @@ public class JobHelperTest new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false, + 0 ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java index f1c7d558c90..a8cb0216705 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -265,7 +265,9 @@ public class DatasourcePathSpecTest new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(null, null, null), null, - ImmutableList.of("timestamp", "host", "visited") + ImmutableList.of("timestamp", "host", "visited"), + false, + 0 ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index 4b9adba54c4..f7ff27c9c3a 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -164,7 +164,9 @@ public class HadoopConverterJobTest new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TestIndex.DIMENSIONS)), null, null), "\t", "\u0001", - Arrays.asList(TestIndex.COLUMNS) + Arrays.asList(TestIndex.COLUMNS), + false, + 0 ), null ), diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 75a86d8b84b..fd36470b111 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -272,6 +272,11 @@ public class IndexTask extends AbstractTask while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); + // The null inputRow means the caller must skip this row. + if (inputRow == null) { + continue; + } + final Interval interval; if (determineIntervals) { interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp()); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 81e85c70dba..3ed1942af86 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.ParseSpec; import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; @@ -74,6 +75,24 @@ public class IndexTaskTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( + new TimestampSpec( + "ts", + "auto", + null + ), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val"), + false, + 0 + ); + private final IndexSpec indexSpec; private final ObjectMapper jsonMapper; private IndexMerger indexMerger; @@ -107,7 +126,7 @@ public class IndexTaskTest IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, false, false), + createIngestionSpec(tmpDir, null, null, 2, null, false, false), null, jsonMapper ); @@ -145,7 +164,7 @@ public class IndexTaskTest IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, true, false), + createIngestionSpec(tmpDir, null, null, 2, null, true, false), null, jsonMapper ); @@ -185,6 +204,7 @@ public class IndexTaskTest null, createIngestionSpec( tmpDir, + null, new ArbitraryGranularitySpec( Granularities.MINUTE, Arrays.asList(new Interval("2014/2015")) @@ -220,6 +240,7 @@ public class IndexTaskTest null, createIngestionSpec( tmpDir, + null, new UniformGranularitySpec( Granularities.HOUR, Granularities.HOUR, @@ -254,7 +275,7 @@ public class IndexTaskTest IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, null, 1, false, false), + createIngestionSpec(tmpDir, null, null, null, 1, false, false), null, jsonMapper ); @@ -285,7 +306,7 @@ public class IndexTaskTest IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, false, true), + createIngestionSpec(tmpDir, null, null, 2, null, false, true), null, jsonMapper ); @@ -323,6 +344,7 @@ public class IndexTaskTest null, createIngestionSpec( tmpDir, + null, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, @@ -357,6 +379,112 @@ public class IndexTaskTest Assert.assertEquals(0, segments.get(2).getShardSpec().getPartitionNum()); } + @Test + public void testCSVFileWithHeader() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("time,d,val"); + writer.println("2014-01-01T00:00:10Z,a,1"); + + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + null, + true, + 0 + ), + null, + 2, + null, + false, + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + + Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions()); + Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + } + + @Test + public void testCSVFileWithHeaderColumnOverride() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("time,d,val"); + writer.println("2014-01-01T00:00:10Z,a,1"); + + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("time", "dim", "val"), + true, + 0 + ), + null, + 2, + null, + false, + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + + Assert.assertEquals(Arrays.asList("dim"), segments.get(0).getDimensions()); + Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + } + private final List runTask(final IndexTask indexTask) throws Exception { final List segments = Lists.newArrayList(); @@ -434,6 +562,7 @@ public class IndexTaskTest private IndexTask.IndexIngestionSpec createIngestionSpec( File baseDir, + ParseSpec parseSpec, GranularitySpec granularitySpec, Integer targetPartitionSize, Integer numShards, @@ -446,20 +575,7 @@ public class IndexTaskTest "test", jsonMapper.convertValue( new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), - Lists.newArrayList(), - Lists.newArrayList() - ), - null, - Arrays.asList("ts", "dim", "val") - ), + parseSpec != null ? parseSpec : DEFAULT_PARSE_SPEC, null ), Map.class diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java index e45ed7e6acc..957e72560d2 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java @@ -19,6 +19,7 @@ package io.druid.java.util.common.parsers; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Splitter; @@ -33,24 +34,17 @@ import java.util.Map; public class CSVParser implements Parser { - private final String listDelimiter; - private final Splitter listSplitter; - private final Function valueFunction; - - private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser(); - - private ArrayList fieldNames = null; - - public CSVParser(final Optional listDelimiter) + private static final Function getValueFunction( + final String listDelimiter, + final Splitter listSplitter + ) { - this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; - this.listSplitter = Splitter.on(this.listDelimiter); - this.valueFunction = new Function() + return new Function() { @Override public Object apply(String input) { - if (input.contains(CSVParser.this.listDelimiter)) { + if (input.contains(listDelimiter)) { return Lists.newArrayList( Iterables.transform( listSplitter.split(input), @@ -64,16 +58,49 @@ public class CSVParser implements Parser }; } - public CSVParser(final Optional listDelimiter, final Iterable fieldNames) + private final String listDelimiter; + private final Splitter listSplitter; + private final Function valueFunction; + private final boolean hasHeaderRow; + private final int maxSkipHeaderRows; + + private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser(); + + private ArrayList fieldNames = null; + private boolean hasParsedHeader = false; + private int skippedHeaderRows; + private boolean supportSkipHeaderRows; + + public CSVParser( + final Optional listDelimiter, + final boolean hasHeaderRow, + final int maxSkipHeaderRows + ) { - this(listDelimiter); + this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; + this.listSplitter = Splitter.on(this.listDelimiter); + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); + + this.hasHeaderRow = hasHeaderRow; + this.maxSkipHeaderRows = maxSkipHeaderRows; + } + + public CSVParser( + final Optional listDelimiter, + final Iterable fieldNames, + final boolean hasHeaderRow, + final int maxSkipHeaderRows + ) + { + this(listDelimiter, hasHeaderRow, maxSkipHeaderRows); setFieldNames(fieldNames); } - public CSVParser(final Optional listDelimiter, final String header) + @VisibleForTesting + CSVParser(final Optional listDelimiter, final String header) { - this(listDelimiter); + this(listDelimiter, false, 0); setFieldNames(header); } @@ -83,6 +110,14 @@ public class CSVParser implements Parser return listDelimiter; } + @Override + public void startFileFromBeginning() + { + supportSkipHeaderRows = true; + hasParsedHeader = false; + skippedHeaderRows = 0; + } + @Override public List getFieldNames() { @@ -92,8 +127,10 @@ public class CSVParser implements Parser @Override public void setFieldNames(final Iterable fieldNames) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); + if (fieldNames != null) { + ParserUtils.validateFields(fieldNames); + this.fieldNames = Lists.newArrayList(fieldNames); + } } public void setFieldNames(final String header) @@ -109,9 +146,27 @@ public class CSVParser implements Parser @Override public Map parse(final String input) { + if (!supportSkipHeaderRows && (hasHeaderRow || maxSkipHeaderRows > 0)) { + throw new UnsupportedOperationException("hasHeaderRow or maxSkipHeaderRows is not supported. " + + "Please check the indexTask supports these options."); + } + try { String[] values = parser.parseLine(input); + if (skippedHeaderRows < maxSkipHeaderRows) { + skippedHeaderRows++; + return null; + } + + if (hasHeaderRow && !hasParsedHeader) { + if (fieldNames == null) { + setFieldNames(Arrays.asList(values)); + } + hasParsedHeader = true; + return null; + } + if (fieldNames == null) { setFieldNames(ParserUtils.generateFieldNames(values.length)); } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java index c37c4bda989..27114df31e9 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java @@ -19,6 +19,7 @@ package io.druid.java.util.common.parsers; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -36,16 +37,44 @@ public class DelimitedParser implements Parser { private static final String DEFAULT_DELIMITER = "\t"; + private static Function getValueFunction( + final String listDelimiter, + final Splitter listSplitter + ) + { + return (input) -> { + if (input.contains(listDelimiter)) { + return Lists.newArrayList( + Iterables.transform( + listSplitter.split(input), + ParserUtils.nullEmptyStringFunction + ) + ); + } else { + return ParserUtils.nullEmptyStringFunction.apply(input); + } + }; + } + private final String delimiter; private final String listDelimiter; - private final Splitter splitter; private final Splitter listSplitter; private final Function valueFunction; + private final boolean hasHeaderRow; + private final int maxSkipHeaderRows; private ArrayList fieldNames = null; + private boolean hasParsedHeader = false; + private int skippedHeaderRows; + private boolean supportSkipHeaderRows; - public DelimitedParser(final Optional delimiter, Optional listDelimiter) + public DelimitedParser( + final Optional delimiter, + final Optional listDelimiter, + final boolean hasHeaderRow, + final int maxSkipHeaderRows + ) { this.delimiter = delimiter.isPresent() ? delimiter.get() : DEFAULT_DELIMITER; this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; @@ -58,40 +87,28 @@ public class DelimitedParser implements Parser this.splitter = Splitter.on(this.delimiter); this.listSplitter = Splitter.on(this.listDelimiter); - this.valueFunction = new Function() - { - @Override - public Object apply(String input) - { - if (input.contains(DelimitedParser.this.listDelimiter)) { - return Lists.newArrayList( - Iterables.transform( - listSplitter.split(input), - ParserUtils.nullEmptyStringFunction - ) - ); - } else { - return ParserUtils.nullEmptyStringFunction.apply(input); - } - } - }; + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); + this.hasHeaderRow = hasHeaderRow; + this.maxSkipHeaderRows = maxSkipHeaderRows; } public DelimitedParser( final Optional delimiter, final Optional listDelimiter, - final Iterable fieldNames + final Iterable fieldNames, + final boolean hasHeaderRow, + final int maxSkipHeaderRows ) { - this(delimiter, listDelimiter); + this(delimiter, listDelimiter, hasHeaderRow, maxSkipHeaderRows); setFieldNames(fieldNames); } - public DelimitedParser(final Optional delimiter, final Optional listDelimiter, final String header) - + @VisibleForTesting + DelimitedParser(final Optional delimiter, final Optional listDelimiter, final String header) { - this(delimiter, listDelimiter); + this(delimiter, listDelimiter, false, 0); setFieldNames(header); } @@ -106,6 +123,14 @@ public class DelimitedParser implements Parser return listDelimiter; } + @Override + public void startFileFromBeginning() + { + supportSkipHeaderRows = true; + hasParsedHeader = false; + skippedHeaderRows = 0; + } + @Override public List getFieldNames() { @@ -115,8 +140,10 @@ public class DelimitedParser implements Parser @Override public void setFieldNames(final Iterable fieldNames) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); + if (fieldNames != null) { + ParserUtils.validateFields(fieldNames); + this.fieldNames = Lists.newArrayList(fieldNames); + } } public void setFieldNames(String header) @@ -132,9 +159,27 @@ public class DelimitedParser implements Parser @Override public Map parse(final String input) { + if (!supportSkipHeaderRows && (hasHeaderRow || maxSkipHeaderRows > 0)) { + throw new UnsupportedOperationException("hasHeaderRow or maxSkipHeaderRows is not supported. " + + "Please check the indexTask supports these options."); + } + try { Iterable values = splitter.split(input); + if (skippedHeaderRows < maxSkipHeaderRows) { + skippedHeaderRows++; + return null; + } + + if (hasHeaderRow && !hasParsedHeader) { + if (fieldNames == null) { + setFieldNames(values); + } + hasParsedHeader = true; + return null; + } + if (fieldNames == null) { setFieldNames(ParserUtils.generateFieldNames(Iterators.size(values.iterator()))); } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java index 2717d2db915..d23d599d3bc 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java @@ -238,7 +238,7 @@ public class JSONPathParser implements Parser /** * Specifies a field to be added to the parsed object Map, using JsonPath notation. - * + *

* See https://github.com/jayway/JsonPath for more information. */ public static class FieldSpec @@ -281,5 +281,4 @@ public class JSONPathParser implements Parser return expr; } } - } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java index 78b29de7224..8cc6fd6a1d6 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java @@ -19,6 +19,7 @@ package io.druid.java.util.common.parsers; +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -28,11 +29,21 @@ import java.util.Map; public interface Parser { /** - * Parse a String into a Map. + * This method may or may not get called at the start of reading of every file depending on the type of IndexTasks. + * The parser state should be reset if exists. + */ + default void startFileFromBeginning() + { + + } + + /** + * Parse a String into a Map. The result can be null which means the given input string will be ignored. * * @throws ParseException if the String cannot be parsed */ - public Map parse(String input); + @Nullable + Map parse(String input); /** * Set the fieldNames that you expect to see in parsed Maps. Deprecated; Parsers should not, in general, be @@ -40,12 +51,12 @@ public interface Parser * parser) and those parsers have their own way of setting field names. */ @Deprecated - public void setFieldNames(Iterable fieldNames); + void setFieldNames(Iterable fieldNames); /** * Returns the fieldNames that we expect to see in parsed Maps, if known, or null otherwise. Deprecated; Parsers * should not, in general, be expected to know what fields they will return. */ @Deprecated - public List getFieldNames(); + List getFieldNames(); } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java index 87b6321aa20..329b02aa944 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java @@ -117,7 +117,6 @@ public class RegexParser implements Parser } @Override - public List getFieldNames() { return fieldNames; diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java index 7e45bf23d94..fede1aa1f98 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java @@ -53,6 +53,12 @@ public class ToLowerCaseParser implements Parser return retVal; } + @Override + public void startFileFromBeginning() + { + baseParser.startFileFromBeginning(); + } + @Override public void setFieldNames(Iterable fieldNames) { diff --git a/java-util/src/test/java/io/druid/java/util/common/parsers/CSVParserTest.java b/java-util/src/test/java/io/druid/java/util/common/parsers/CSVParserTest.java index 8121fd9fd0c..37a589b276a 100644 --- a/java-util/src/test/java/io/druid/java/util/common/parsers/CSVParserTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/parsers/CSVParserTest.java @@ -21,7 +21,7 @@ package io.druid.java.util.common.parsers; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; -import junit.framework.Assert; +import org.junit.Assert; import org.junit.Test; import java.util.Map; @@ -80,7 +80,7 @@ public class CSVParserTest @Test public void testCSVParserWithoutHeader() { - final Parser csvParser = new CSVParser(Optional.fromNullable(null)); + final Parser csvParser = new CSVParser(Optional.fromNullable(null), false, 0); String body = "hello,world,foo"; final Map jsonMap = csvParser.parse(body); Assert.assertEquals( @@ -89,4 +89,48 @@ public class CSVParserTest jsonMap ); } + + @Test + public void testCSVParserWithSkipHeaderRows() + { + final int skipHeaderRows = 2; + final Parser csvParser = new CSVParser( + Optional.absent(), + false, + skipHeaderRows + ); + csvParser.startFileFromBeginning(); + final String[] body = new String[] { + "header,line,1", + "header,line,2", + "hello,world,foo" + }; + int index; + for (index = 0; index < skipHeaderRows; index++) { + Assert.assertNull(csvParser.parse(body[index])); + } + final Map jsonMap = csvParser.parse(body[index]); + Assert.assertEquals( + "jsonMap", + ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"), + jsonMap + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testCSVParserWithoutStartFileFromBeginning() + { + final int skipHeaderRows = 2; + final Parser csvParser = new CSVParser( + Optional.absent(), + false, + skipHeaderRows + ); + final String[] body = new String[] { + "header\tline\t1", + "header\tline\t2", + "hello\tworld\tfoo" + }; + csvParser.parse(body[0]); + } } diff --git a/java-util/src/test/java/io/druid/java/util/common/parsers/DelimitedParserTest.java b/java-util/src/test/java/io/druid/java/util/common/parsers/DelimitedParserTest.java index 3ca58c67f67..d91ed25cbbc 100644 --- a/java-util/src/test/java/io/druid/java/util/common/parsers/DelimitedParserTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/parsers/DelimitedParserTest.java @@ -21,7 +21,7 @@ package io.druid.java.util.common.parsers; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; -import junit.framework.Assert; +import org.junit.Assert; import org.junit.Test; import java.util.Map; @@ -67,7 +67,11 @@ public class DelimitedParserTest public void testTSVParserWithHeader() { String header = "time\tvalue1\tvalue2"; - final Parser delimitedParser = new DelimitedParser(Optional.of("\t"), Optional.absent(), header); + final Parser delimitedParser = new DelimitedParser( + Optional.of("\t"), + Optional.absent(), + header + ); String body = "hello\tworld\tfoo"; final Map jsonMap = delimitedParser.parse(body); Assert.assertEquals( @@ -80,7 +84,12 @@ public class DelimitedParserTest @Test public void testTSVParserWithoutHeader() { - final Parser delimitedParser = new DelimitedParser(Optional.of("\t"), Optional.absent()); + final Parser delimitedParser = new DelimitedParser( + Optional.of("\t"), + Optional.absent(), + false, + 0 + ); String body = "hello\tworld\tfoo"; final Map jsonMap = delimitedParser.parse(body); Assert.assertEquals( @@ -89,4 +98,50 @@ public class DelimitedParserTest jsonMap ); } + + @Test + public void testTSVParserWithSkipHeaderRows() + { + final int skipHeaderRows = 2; + final Parser delimitedParser = new DelimitedParser( + Optional.of("\t"), + Optional.absent(), + false, + skipHeaderRows + ); + delimitedParser.startFileFromBeginning(); + final String[] body = new String[] { + "header\tline\t1", + "header\tline\t2", + "hello\tworld\tfoo" + }; + int index; + for (index = 0; index < skipHeaderRows; index++) { + Assert.assertNull(delimitedParser.parse(body[index])); + } + final Map jsonMap = delimitedParser.parse(body[index]); + Assert.assertEquals( + "jsonMap", + ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"), + jsonMap + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testTSVParserWithoutStartFileFromBeginning() + { + final int skipHeaderRows = 2; + final Parser delimitedParser = new DelimitedParser( + Optional.of("\t"), + Optional.absent(), + false, + skipHeaderRows + ); + final String[] body = new String[] { + "header\tline\t1", + "header\tline\t2", + "hello\tworld\tfoo" + }; + delimitedParser.parse(body[0]); + } } diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java index d4fb1223ec8..614d45ff18b 100644 --- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java @@ -130,7 +130,9 @@ public class MultiValuedDimensionTest new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null), "\t", - ImmutableList.of("timestamp", "product", "tags") + ImmutableList.of("timestamp", "product", "tags"), + false, + 0 ), "UTF-8" ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java index 26df8376601..edda6b9db3c 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java @@ -146,7 +146,9 @@ public class GroupByQueryRunnerFactoryTest new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null), "\t", - ImmutableList.of("timestamp", "product", "tags") + ImmutableList.of("timestamp", "product", "tags"), + false, + 0 ), "UTF-8" ); diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 6f529768491..7a29b924860 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -291,7 +291,9 @@ public class TestIndex new DimensionsSpec(DIMENSION_SCHEMAS, null, null), "\t", "\u0001", - Arrays.asList(COLUMNS) + Arrays.asList(COLUMNS), + false, + 0 ) , "utf8" ); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java index 53a586ce4f2..08cd87ac089 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java @@ -162,6 +162,16 @@ public class ReplayableFirehoseFactory implements FirehoseFactory