From 0514e5686e58e384a1b7b05ec0a89612040e65e5 Mon Sep 17 00:00:00 2001 From: Rye Date: Fri, 22 Nov 2019 18:01:40 -0800 Subject: [PATCH] add TsvInputFormat (#8915) * add TsvInputFormat * refactor code * fix grammar * use enum replace string literal * code refactor * code refactor * mark abstract for base class meant not to be instantiated * remove constructor for test --- .../apache/druid/data/input/InputFormat.java | 4 +- .../druid/data/input/impl/CSVParseSpec.java | 2 +- .../druid/data/input/impl/CsvInputFormat.java | 119 +------ .../druid/data/input/impl/CsvReader.java | 108 +----- .../input/impl/SeparateValueInputFormat.java | 195 +++++++++++ .../data/input/impl/SeparateValueReader.java | 157 +++++++++ .../druid/data/input/impl/TsvInputFormat.java | 41 +++ .../druid/data/input/impl/TsvReader.java | 52 +++ .../java/util/common/parsers/CSVParser.java | 4 +- .../data/input/impl/CsvInputFormatTest.java | 4 +- .../druid/data/input/impl/CsvReaderTest.java | 28 +- .../impl/InputEntityIteratingReaderTest.java | 1 + .../impl/TimedShutoffInputSourceTest.java | 2 +- .../data/input/impl/TsvInputFormatTest.java | 54 +++ .../druid/data/input/impl/TsvReaderTest.java | 308 ++++++++++++++++++ .../ParallelIndexSupervisorTaskSerdeTest.java | 2 +- .../sampler/CsvInputSourceSamplerTest.java | 2 +- .../sampler/InputSourceSamplerTest.java | 2 +- .../RecordSupplierInputSourceTest.java | 2 +- 19 files changed, 854 insertions(+), 233 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java create mode 100644 core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java create mode 100644 core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/InputFormat.java b/core/src/main/java/org/apache/druid/data/input/InputFormat.java index cb6b495179b..d3671f5f7c4 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.NestedInputFormat; import org.apache.druid.data.input.impl.RegexInputFormat; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.TsvInputFormat; import org.apache.druid.guice.annotations.UnstableApi; import java.io.File; @@ -45,7 +46,8 @@ import java.io.IOException; @JsonSubTypes(value = { @Type(name = "csv", value = CsvInputFormat.class), @Type(name = "json", value = JsonInputFormat.class), - @Type(name = "regex", value = RegexInputFormat.class) + @Type(name = "regex", value = RegexInputFormat.class), + @Type(name = "tsv", value = TsvInputFormat.class) }) public interface InputFormat { diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java index 5340a427451..51ffd3831a9 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java @@ -100,7 +100,7 @@ public class CSVParseSpec extends ParseSpec @Override public InputFormat toInputFormat() { - return new CsvInputFormat(columns, listDelimiter, hasHeaderRow, skipHeaderRows); + return new CsvInputFormat(columns, listDelimiter, null, hasHeaderRow, skipHeaderRows); } @Override diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index cf2a301a69e..28aa5c680de 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -21,29 +21,12 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.InputEntity; -import org.apache.druid.data.input.InputEntityReader; -import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.indexer.Checks; -import org.apache.druid.indexer.Property; import javax.annotation.Nullable; -import java.io.File; -import java.util.Collections; import java.util.List; -import java.util.Objects; -public class CsvInputFormat implements InputFormat +public class CsvInputFormat extends SeparateValueInputFormat { - private final String listDelimiter; - private final List columns; - private final boolean findColumnsFromHeader; - private final int skipHeaderRows; - @JsonCreator public CsvInputFormat( @JsonProperty("columns") @Nullable List columns, @@ -53,104 +36,6 @@ public class CsvInputFormat implements InputFormat @JsonProperty("skipHeaderRows") int skipHeaderRows ) { - this.listDelimiter = listDelimiter; - this.columns = columns == null ? Collections.emptyList() : columns; - //noinspection ConstantConditions - this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty( - ImmutableList.of( - new Property<>("hasHeaderRow", hasHeaderRow), - new Property<>("findColumnsFromHeader", findColumnsFromHeader) - ) - ).getValue(); - this.skipHeaderRows = skipHeaderRows; - - if (!this.columns.isEmpty()) { - for (String column : this.columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); - } - } else { - Preconditions.checkArgument( - this.findColumnsFromHeader, - "If columns field is not set, the first row of your data must have your header" - + " and hasHeaderRow must be set to true." - ); - } - } - - @VisibleForTesting - public CsvInputFormat( - List columns, - String listDelimiter, - boolean findColumnsFromHeader, - int skipHeaderRows - ) - { - this(columns, listDelimiter, null, findColumnsFromHeader, skipHeaderRows); - } - - @JsonProperty - public List getColumns() - { - return columns; - } - - @JsonProperty - public String getListDelimiter() - { - return listDelimiter; - } - - @JsonProperty - public boolean isFindColumnsFromHeader() - { - return findColumnsFromHeader; - } - - @JsonProperty - public int getSkipHeaderRows() - { - return skipHeaderRows; - } - - @Override - public boolean isSplittable() - { - return true; - } - - @Override - public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) - { - return new CsvReader( - inputRowSchema, - source, - temporaryDirectory, - listDelimiter, - columns, - findColumnsFromHeader, - skipHeaderRows - ); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CsvInputFormat format = (CsvInputFormat) o; - return findColumnsFromHeader == format.findColumnsFromHeader && - skipHeaderRows == format.skipHeaderRows && - Objects.equals(listDelimiter, format.listDelimiter) && - Objects.equals(columns, format.columns); - } - - @Override - public int hashCode() - { - return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows); + super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.CSV); } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java index 966ce6958bd..92fe72d8248 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java @@ -19,49 +19,15 @@ package org.apache.druid.data.input.impl; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import com.opencsv.RFC4180Parser; -import com.opencsv.RFC4180ParserBuilder; -import com.opencsv.enums.CSVReaderNullFieldIndicator; -import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntity; -import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.TextReader; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.collect.Utils; -import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.java.util.common.parsers.ParserUtils; -import org.apache.druid.java.util.common.parsers.Parsers; import javax.annotation.Nullable; import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; -public class CsvReader extends TextReader +public class CsvReader extends SeparateValueReader { - private final RFC4180Parser parser = CsvReader.createOpenCsvParser(); - private final boolean findColumnsFromHeader; - private final int skipHeaderRows; - private final Function multiValueFunction; - @Nullable - private List columns; - - public static RFC4180Parser createOpenCsvParser() - { - return NullHandling.replaceWithDefault() - ? new RFC4180Parser() - : new RFC4180ParserBuilder().withFieldAsNull( - CSVReaderNullFieldIndicator.EMPTY_SEPARATORS).build(); - } - CsvReader( InputRowSchema inputRowSchema, InputEntity source, @@ -72,69 +38,15 @@ public class CsvReader extends TextReader int skipHeaderRows ) { - super(inputRowSchema, source, temporaryDirectory); - this.findColumnsFromHeader = findColumnsFromHeader; - this.skipHeaderRows = skipHeaderRows; - final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; - this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); - this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row - - if (this.columns != null) { - for (String column : this.columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); - } - } else { - Preconditions.checkArgument( - findColumnsFromHeader, - "If columns field is not set, the first row of your data must have your header" - + " and hasHeaderRow must be set to true." - ); - } - } - - @Override - public List parseInputRows(String line) throws IOException, ParseException - { - final Map zipped = parseLine(line); - return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped)); - } - - @Override - public Map toMap(String intermediateRow) throws IOException - { - return parseLine(intermediateRow); - } - - private Map parseLine(String line) throws IOException - { - final String[] parsed = parser.parseLine(line); - return Utils.zipMapPartial( - Preconditions.checkNotNull(columns, "columns"), - Iterables.transform(Arrays.asList(parsed), multiValueFunction) + super( + inputRowSchema, + source, + temporaryDirectory, + listDelimiter, + columns, + findColumnsFromHeader, + skipHeaderRows, + SeparateValueInputFormat.Format.CSV ); } - - @Override - public int getNumHeaderLinesToSkip() - { - return skipHeaderRows; - } - - @Override - public boolean needsToProcessHeaderLine() - { - return findColumnsFromHeader; - } - - @Override - public void processHeaderLine(String line) throws IOException - { - if (!findColumnsFromHeader) { - throw new ISE("Don't call this if findColumnsFromHeader = false"); - } - columns = findOrCreateColumnNames(Arrays.asList(parser.parseLine(line))); - if (columns.isEmpty()) { - throw new ISE("Empty columns"); - } - } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java new file mode 100644 index 00000000000..93a13e8b803 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.indexer.Checks; +import org.apache.druid.indexer.Property; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * SeparateValueInputFormat abstracts the (Comma/Tab) Separate Value format of input data. + * It implements the common logic between {@link CsvInputFormat} and {@link TsvInputFormat} + * Should never be instantiated + */ +public abstract class SeparateValueInputFormat implements InputFormat +{ + + public enum Format + { + CSV(',', "comma"), + TSV('\t', "tab"); + + private final char delimiter; + private final String literal; + + Format(char delimiter, String literal) + { + this.delimiter = delimiter; + this.literal = literal; + } + + public String getDelimiterAsString() + { + return String.valueOf(delimiter); + } + + public char getDelimiter() + { + return delimiter; + } + + public String getLiteral() + { + return literal; + } + } + + private final String listDelimiter; + private final List columns; + private final boolean findColumnsFromHeader; + private final int skipHeaderRows; + private final Format format; + + protected SeparateValueInputFormat( + @Nullable List columns, + @Nullable String listDelimiter, + @Nullable Boolean hasHeaderRow, + @Nullable Boolean findColumnsFromHeader, + int skipHeaderRows, + Format format + ) + { + this.listDelimiter = listDelimiter; + this.columns = columns == null ? Collections.emptyList() : columns; + //noinspection ConstantConditions + this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty( + ImmutableList.of( + new Property<>("hasHeaderRow", hasHeaderRow), + new Property<>("findColumnsFromHeader", findColumnsFromHeader) + ) + ).getValue(); + this.skipHeaderRows = skipHeaderRows; + this.format = format; + + if (!this.columns.isEmpty()) { + for (String column : this.columns) { + Preconditions.checkArgument( + !column.contains(format.getDelimiterAsString()), + "Column[%s] has a " + format.getLiteral() + ", it cannot", + column + ); + } + } else { + Preconditions.checkArgument( + this.findColumnsFromHeader, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); + } + } + + + @JsonProperty + public List getColumns() + { + return columns; + } + + @JsonProperty + public String getListDelimiter() + { + return listDelimiter; + } + + @JsonProperty + public boolean isFindColumnsFromHeader() + { + return findColumnsFromHeader; + } + + @JsonProperty + public int getSkipHeaderRows() + { + return skipHeaderRows; + } + + @Override + public boolean isSplittable() + { + return true; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return this.format == Format.TSV ? new TsvReader( + inputRowSchema, + source, + temporaryDirectory, + listDelimiter, + columns, + findColumnsFromHeader, + skipHeaderRows + ) : new CsvReader( + inputRowSchema, + source, + temporaryDirectory, + listDelimiter, + columns, + findColumnsFromHeader, + skipHeaderRows + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SeparateValueInputFormat format = (SeparateValueInputFormat) o; + return findColumnsFromHeader == format.findColumnsFromHeader && + skipHeaderRows == format.skipHeaderRows && + Objects.equals(listDelimiter, format.listDelimiter) && + Objects.equals(columns, format.columns) && + Objects.equals(this.format, format.format); + } + + @Override + public int hashCode() + { + return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows, format); + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java new file mode 100644 index 00000000000..9ed553c1d78 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.opencsv.RFC4180Parser; +import com.opencsv.RFC4180ParserBuilder; +import com.opencsv.enums.CSVReaderNullFieldIndicator; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.TextReader; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.collect.Utils; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.java.util.common.parsers.ParserUtils; +import org.apache.druid.java.util.common.parsers.Parsers; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * SeparateValueReader abstracts the reader for (Comma/Tab) Separate Value format input data. + * It implements the common logic between {@link CsvReader} and {@link TsvReader} + * Should never be instantiated + */ +public abstract class SeparateValueReader extends TextReader +{ + private final boolean findColumnsFromHeader; + private final int skipHeaderRows; + private final Function multiValueFunction; + @Nullable + private List columns; + private final SeparateValueInputFormat.Format format; + private final RFC4180Parser parser; + + public static RFC4180Parser createOpenCsvParser(char separator) + { + return NullHandling.replaceWithDefault() + ? new RFC4180ParserBuilder() + .withSeparator(separator) + .build() + : new RFC4180ParserBuilder().withFieldAsNull( + CSVReaderNullFieldIndicator.EMPTY_SEPARATORS) + .withSeparator(separator) + .build(); + } + + SeparateValueReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory, + @Nullable String listDelimiter, + @Nullable List columns, + boolean findColumnsFromHeader, + int skipHeaderRows, + SeparateValueInputFormat.Format format + ) + { + super(inputRowSchema, source, temporaryDirectory); + this.findColumnsFromHeader = findColumnsFromHeader; + this.skipHeaderRows = skipHeaderRows; + final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; + this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); + this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row + this.format = format; + this.parser = createOpenCsvParser(format.getDelimiter()); + + if (this.columns != null) { + for (String column : this.columns) { + Preconditions.checkArgument( + !column.contains(format.getDelimiterAsString()), + "Column[%s] has a " + format.getLiteral() + ", it cannot", + column + ); + } + } else { + Preconditions.checkArgument( + findColumnsFromHeader, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); + } + } + + @Override + public List parseInputRows(String line) throws IOException, ParseException + { + final Map zipped = parseLine(line); + return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped)); + } + + @Override + public Map toMap(String intermediateRow) throws IOException + { + return parseLine(intermediateRow); + } + + private Map parseLine(String line) throws IOException + { + final String[] parsed = parser.parseLine(line); + return Utils.zipMapPartial( + Preconditions.checkNotNull(columns, "columns"), + Iterables.transform(Arrays.asList(parsed), multiValueFunction) + ); + } + + @Override + public int getNumHeaderLinesToSkip() + { + return skipHeaderRows; + } + + @Override + public boolean needsToProcessHeaderLine() + { + return findColumnsFromHeader; + } + + @Override + public void processHeaderLine(String line) throws IOException + { + if (!findColumnsFromHeader) { + throw new ISE("Don't call this if findColumnsFromHeader = false"); + } + columns = findOrCreateColumnNames(Arrays.asList(parser.parseLine(line))); + if (columns.isEmpty()) { + throw new ISE("Empty columns"); + } + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java new file mode 100644 index 00000000000..8bec9536989 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; +import java.util.List; + +public class TsvInputFormat extends SeparateValueInputFormat +{ + @JsonCreator + public TsvInputFormat( + @JsonProperty("columns") @Nullable List columns, + @JsonProperty("listDelimiter") @Nullable String listDelimiter, + @Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow, + @JsonProperty("findColumnsFromHeader") @Nullable Boolean findColumnsFromHeader, + @JsonProperty("skipHeaderRows") int skipHeaderRows + ) + { + super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.TSV); + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java new file mode 100644 index 00000000000..961f61c634e --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRowSchema; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.List; + +public class TsvReader extends SeparateValueReader +{ + TsvReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory, + @Nullable String listDelimiter, + @Nullable List columns, + boolean findColumnsFromHeader, + int skipHeaderRows + ) + { + super( + inputRowSchema, + source, + temporaryDirectory, + listDelimiter, + columns, + findColumnsFromHeader, + skipHeaderRows, + SeparateValueInputFormat.Format.TSV + ); + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java index 17eba147564..d026ad4eaed 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java @@ -21,7 +21,7 @@ package org.apache.druid.java.util.common.parsers; import com.google.common.annotations.VisibleForTesting; import com.opencsv.RFC4180Parser; -import org.apache.druid.data.input.impl.CsvReader; +import org.apache.druid.data.input.impl.SeparateValueReader; import javax.annotation.Nullable; import java.io.IOException; @@ -30,7 +30,7 @@ import java.util.List; public class CSVParser extends AbstractFlatTextFormatParser { - private final RFC4180Parser parser = CsvReader.createOpenCsvParser(); + private final RFC4180Parser parser = SeparateValueReader.createOpenCsvParser(','); public CSVParser( @Nullable final String listDelimiter, diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java index 8d2d688e986..b09ba86f09e 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java @@ -38,7 +38,7 @@ public class CsvInputFormatTest public void testSerde() throws IOException { final ObjectMapper mapper = new ObjectMapper(); - final CsvInputFormat format = new CsvInputFormat(Collections.singletonList("a"), "|", true, 10); + final CsvInputFormat format = new CsvInputFormat(Collections.singletonList("a"), "|", null, true, 10); final byte[] bytes = mapper.writeValueAsBytes(format); final CsvInputFormat fromJson = (CsvInputFormat) mapper.readValue(bytes, InputFormat.class); Assert.assertEquals(format, fromJson); @@ -49,6 +49,6 @@ public class CsvInputFormatTest { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Column[a,] has a comma, it cannot"); - new CsvInputFormat(Collections.singletonList("a,"), ",", false, 0); + new CsvInputFormat(Collections.singletonList("a,"), ",", null, false, 0); } } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java index 653fcc4c114..ec942379f3b 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java @@ -22,6 +22,7 @@ package org.apache.druid.data.input.impl; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; @@ -30,6 +31,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import java.io.ByteArrayOutputStream; @@ -48,6 +50,12 @@ public class CsvReaderTest Collections.emptyList() ); + @BeforeClass + public static void setup() + { + NullHandling.initializeForTests(); + } + @Test public void testWithoutHeaders() throws IOException { @@ -58,7 +66,7 @@ public class CsvReaderTest "2019-01-01T00:00:30Z,name_3,15" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 0); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 0); assertResult(source, format); } @@ -73,7 +81,7 @@ public class CsvReaderTest "2019-01-01T00:00:30Z,name_3,15" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, true, 0); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, null, true, 0); assertResult(source, format); } @@ -88,7 +96,7 @@ public class CsvReaderTest "2019-01-01T00:00:30Z,name_3,15" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 1); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 1); assertResult(source, format); } @@ -104,7 +112,7 @@ public class CsvReaderTest "2019-01-01T00:00:30Z,name_3,15" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, true, 1); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, null, true, 1); assertResult(source, format); } @@ -119,7 +127,7 @@ public class CsvReaderTest "2019-01-01T00:00:30Z,name_3,15|3" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), "|", true, 0); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), "|", null, true, 0); final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); int numResults = 0; try (CloseableIterator iterator = reader.read()) { @@ -210,7 +218,13 @@ public class CsvReaderTest ImmutableMap.of("Value", "65", "Comment", "Here I write \\n slash n", "Timestamp", "2018-05-09T10:00:00Z") ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("Value", "Comment", "Timestamp"), null, false, 0); + final CsvInputFormat format = new CsvInputFormat( + ImmutableList.of("Value", "Comment", "Timestamp"), + null, + null, + false, + 0 + ); final InputEntityReader reader = format.createReader( new InputRowSchema( new TimestampSpec("Timestamp", "auto", null), @@ -238,7 +252,7 @@ public class CsvReaderTest "2019-01-01T00:00:10Z,name_1,\"Как говорится: \\\"\"всё течет, всё изменяется\\\"\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева\"" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, false, 0); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, null, false, 0); final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); try (CloseableIterator iterator = reader.read()) { Assert.assertTrue(iterator.hasNext()); diff --git a/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java index 6d55f14d103..7bde81c3cb3 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java @@ -69,6 +69,7 @@ public class InputEntityIteratingReaderTest new CsvInputFormat( ImmutableList.of("time", "name", "score"), null, + null, false, 0 ), diff --git a/core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java index 4f04e880d9b..2b4b2fc860a 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java @@ -43,7 +43,7 @@ public class TimedShutoffInputSourceTest new InlineInputSource("this,is,test\nthis,data,has\n3,rows,\n"), DateTimes.nowUtc().plusMillis(timeoutMs) ); - final InputFormat inputFormat = new CsvInputFormat(ImmutableList.of("col1", "col2", "col3"), null, false, 0); + final InputFormat inputFormat = new CsvInputFormat(ImmutableList.of("col1", "col2", "col3"), null, null, false, 0); final InputSourceReader reader = inputSource.reader( new InputRowSchema(new TimestampSpec(null, null, null), new DimensionsSpec(null), Collections.emptyList()), inputFormat, diff --git a/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java new file mode 100644 index 00000000000..b160e4bc223 --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputFormat; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.Collections; + +public class TsvInputFormatTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = new ObjectMapper(); + final TsvInputFormat format = new TsvInputFormat(Collections.singletonList("a"), "|", null, true, 10); + final byte[] bytes = mapper.writeValueAsBytes(format); + final TsvInputFormat fromJson = (TsvInputFormat) mapper.readValue(bytes, InputFormat.class); + Assert.assertEquals(format, fromJson); + } + + @Test + public void testTab() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Column[a\t] has a tab, it cannot"); + new TsvInputFormat(Collections.singletonList("a\t"), ",", null, false, 0); + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java new file mode 100644 index 00000000000..8f42e40ffae --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +public class TsvReaderTest +{ + private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))), + Collections.emptyList() + ); + + @BeforeClass + public static void setup() + { + NullHandling.initializeForTests(); + } + + @Test + public void testWithoutHeaders() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "2019-01-01T00:00:10Z\tname_1\t5", + "2019-01-01T00:00:20Z\tname_2\t10", + "2019-01-01T00:00:30Z\tname_3\t15" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 0); + assertResult(source, format); + } + + @Test + public void testFindColumn() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "ts\tname\tscore", + "2019-01-01T00:00:10Z\tname_1\t5", + "2019-01-01T00:00:20Z\tname_2\t10", + "2019-01-01T00:00:30Z\tname_3\t15" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, null, true, 0); + assertResult(source, format); + } + + @Test + public void testSkipHeaders() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "this\tis\ta\trow\tto\tskip", + "2019-01-01T00:00:10Z\tname_1\t5", + "2019-01-01T00:00:20Z\tname_2\t10", + "2019-01-01T00:00:30Z\tname_3\t15" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 1); + assertResult(source, format); + } + + @Test + public void testFindColumnAndSkipHeaders() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "this\tis\ta\trow\tto\tskip", + "ts\tname\tscore", + "2019-01-01T00:00:10Z\tname_1\t5", + "2019-01-01T00:00:20Z\tname_2\t10", + "2019-01-01T00:00:30Z\tname_3\t15" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, null, true, 1); + assertResult(source, format); + } + + @Test + public void testMultiValues() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "ts\tname\tscore", + "2019-01-01T00:00:10Z\tname_1\t5|1", + "2019-01-01T00:00:20Z\tname_2\t10|2", + "2019-01-01T00:00:30Z\tname_3\t15|3" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), "|", null, true, 0); + final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); + int numResults = 0; + try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { + final InputRow row = iterator.next(); + Assert.assertEquals( + DateTimes.of(StringUtils.format("2019-01-01T00:00:%02dZ", (numResults + 1) * 10)), + row.getTimestamp() + ); + Assert.assertEquals( + StringUtils.format("name_%d", numResults + 1), + Iterables.getOnlyElement(row.getDimension("name")) + ); + Assert.assertEquals( + ImmutableList.of(Integer.toString((numResults + 1) * 5), Integer.toString(numResults + 1)), + row.getDimension("score") + ); + numResults++; + } + Assert.assertEquals(3, numResults); + } + } + + @Test + public void testQuotes() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "3\t\"Lets do some \"\"normal\"\" quotes\"\t2018-05-05T10:00:00Z", + "34\t\"Lets do some \"\"normal\"\", quotes with comma\"\t2018-05-06T10:00:00Z", + "343\t\"Lets try \\\"\"it\\\"\" with slash quotes\"\t2018-05-07T10:00:00Z", + "545\t\"Lets try \\\"\"it\\\"\", with slash quotes and comma\"\t2018-05-08T10:00:00Z", + "65\tHere I write \\n slash n\t2018-05-09T10:00:00Z" + ) + ); + final List expectedResults = ImmutableList.of( + new MapBasedInputRow( + DateTimes.of("2018-05-05T10:00:00Z"), + ImmutableList.of("Timestamp"), + ImmutableMap.of( + "Value", + "3", + "Comment", + "Lets do some \"normal\" quotes", + "Timestamp", + "2018-05-05T10:00:00Z" + ) + ), + new MapBasedInputRow( + DateTimes.of("2018-05-06T10:00:00Z"), + ImmutableList.of("Timestamp"), + ImmutableMap.of( + "Value", + "34", + "Comment", + "Lets do some \"normal\", quotes with comma", + "Timestamp", + "2018-05-06T10:00:00Z" + ) + ), + new MapBasedInputRow( + DateTimes.of("2018-05-07T10:00:00Z"), + ImmutableList.of("Timestamp"), + ImmutableMap.of( + "Value", + "343", + "Comment", + "Lets try \\\"it\\\" with slash quotes", + "Timestamp", + "2018-05-07T10:00:00Z" + ) + ), + new MapBasedInputRow( + DateTimes.of("2018-05-08T10:00:00Z"), + ImmutableList.of("Timestamp"), + ImmutableMap.of( + "Value", + "545", + "Comment", + "Lets try \\\"it\\\", with slash quotes and comma", + "Timestamp", + "2018-05-08T10:00:00Z" + ) + ), + new MapBasedInputRow( + DateTimes.of("2018-05-09T10:00:00Z"), + ImmutableList.of("Timestamp"), + ImmutableMap.of("Value", "65", "Comment", "Here I write \\n slash n", "Timestamp", "2018-05-09T10:00:00Z") + ) + ); + final TsvInputFormat format = new TsvInputFormat( + ImmutableList.of("Value", "Comment", "Timestamp"), + null, + null, + false, + 0 + ); + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("Timestamp", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("Timestamp"))), + Collections.emptyList() + ), + source, + null + ); + + try (CloseableIterator iterator = reader.read()) { + final Iterator expectedRowIterator = expectedResults.iterator(); + while (iterator.hasNext()) { + Assert.assertTrue(expectedRowIterator.hasNext()); + Assert.assertEquals(expectedRowIterator.next(), iterator.next()); + } + } + } + + @Test + public void testRussianTextMess() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "2019-01-01T00:00:10Z\tname_1\t\"Как говорится: \\\"\"всё течет\t всё изменяется\\\"\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева\"" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, null, false, 0); + final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); + try (CloseableIterator iterator = reader.read()) { + Assert.assertTrue(iterator.hasNext()); + final InputRow row = iterator.next(); + Assert.assertEquals(DateTimes.of("2019-01-01T00:00:10Z"), row.getTimestamp()); + Assert.assertEquals("name_1", Iterables.getOnlyElement(row.getDimension("name"))); + Assert.assertEquals( + "Как говорится: \\\"всё течет\t всё изменяется\\\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева", + Iterables.getOnlyElement(row.getDimension("Comment")) + ); + Assert.assertFalse(iterator.hasNext()); + } + } + + private ByteEntity writeData(List lines) throws IOException + { + final List byteLines = lines.stream() + .map(line -> StringUtils.toUtf8(line + "\n")) + .collect(Collectors.toList()); + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream( + byteLines.stream().mapToInt(bytes -> bytes.length).sum() + ); + for (byte[] bytes : byteLines) { + outputStream.write(bytes); + } + return new ByteEntity(outputStream.toByteArray()); + } + + private void assertResult(ByteEntity source, TsvInputFormat format) throws IOException + { + final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); + int numResults = 0; + try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { + final InputRow row = iterator.next(); + Assert.assertEquals( + DateTimes.of(StringUtils.format("2019-01-01T00:00:%02dZ", (numResults + 1) * 10)), + row.getTimestamp() + ); + Assert.assertEquals( + StringUtils.format("name_%d", numResults + 1), + Iterables.getOnlyElement(row.getDimension("name")) + ); + Assert.assertEquals( + Integer.toString((numResults + 1) * 5), + Iterables.getOnlyElement(row.getDimension("score")) + ); + numResults++; + } + Assert.assertEquals(3, numResults); + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index 97c6954e924..b087e25d823 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -217,7 +217,7 @@ public class ParallelIndexSupervisorTaskSerdeTest private final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( null, new LocalInputSource(new File("tmp"), "test_*"), - new CsvInputFormat(Arrays.asList("ts", "dim", "val"), null, false, 0), + new CsvInputFormat(Arrays.asList("ts", "dim", "val"), null, null, false, 0), false ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java index 753ad10326a..36ae63026c5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java @@ -58,7 +58,7 @@ public class CsvInputSourceSamplerTest "Michael,Jackson,,Male" ); final InputSource inputSource = new InlineInputSource(String.join("\n", strCsvRows)); - final InputFormat inputFormat = new CsvInputFormat(null, null, true, 0); + final InputFormat inputFormat = new CsvInputFormat(null, null, null, true, 0); final InputSourceSampler inputSourceSampler = new InputSourceSampler(); final SamplerResponse response = inputSourceSampler.sample( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index cbfadfad75f..ee6b9df9ab8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -1090,7 +1090,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest case STR_JSON: return new JsonInputFormat(null, null); case STR_CSV: - return new CsvInputFormat(ImmutableList.of("t", "dim1", "dim2", "met1"), null, false, 0); + return new CsvInputFormat(ImmutableList.of("t", "dim1", "dim2", "met1"), null, null, false, 0); default: throw new IAE("Unknown parser type: %s", parserType); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java index fbd6a91bb87..e93eb10ebf2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java @@ -71,7 +71,7 @@ public class RecordSupplierInputSourceTest final List colNames = IntStream.range(0, NUM_COLS) .mapToObj(i -> StringUtils.format("col_%d", i)) .collect(Collectors.toList()); - final InputFormat inputFormat = new CsvInputFormat(colNames, null, false, 0); + final InputFormat inputFormat = new CsvInputFormat(colNames, null, null, false, 0); final InputSourceReader reader = inputSource.reader( new InputRowSchema( new TimestampSpec("col_0", "auto", null),