add TsvInputFormat (#8915)

* add TsvInputFormat

* refactor code

* fix grammar

* use enum replace string literal

* code refactor

* code refactor

* mark abstract for base class meant not to be instantiated

* remove constructor for test
This commit is contained in:
Rye 2019-11-22 18:01:40 -08:00 committed by Jonathan Wei
parent 7250010388
commit 0514e5686e
19 changed files with 854 additions and 233 deletions

View File

@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.TsvInputFormat;
import org.apache.druid.guice.annotations.UnstableApi;
import java.io.File;
@ -45,7 +46,8 @@ import java.io.IOException;
@JsonSubTypes(value = {
@Type(name = "csv", value = CsvInputFormat.class),
@Type(name = "json", value = JsonInputFormat.class),
@Type(name = "regex", value = RegexInputFormat.class)
@Type(name = "regex", value = RegexInputFormat.class),
@Type(name = "tsv", value = TsvInputFormat.class)
})
public interface InputFormat
{

View File

@ -100,7 +100,7 @@ public class CSVParseSpec extends ParseSpec
@Override
public InputFormat toInputFormat()
{
return new CsvInputFormat(columns, listDelimiter, hasHeaderRow, skipHeaderRows);
return new CsvInputFormat(columns, listDelimiter, null, hasHeaderRow, skipHeaderRows);
}
@Override

View File

@ -21,29 +21,12 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class CsvInputFormat implements InputFormat
public class CsvInputFormat extends SeparateValueInputFormat
{
private final String listDelimiter;
private final List<String> columns;
private final boolean findColumnsFromHeader;
private final int skipHeaderRows;
@JsonCreator
public CsvInputFormat(
@JsonProperty("columns") @Nullable List<String> columns,
@ -53,104 +36,6 @@ public class CsvInputFormat implements InputFormat
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
this.listDelimiter = listDelimiter;
this.columns = columns == null ? Collections.emptyList() : columns;
//noinspection ConstantConditions
this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("hasHeaderRow", hasHeaderRow),
new Property<>("findColumnsFromHeader", findColumnsFromHeader)
)
).getValue();
this.skipHeaderRows = skipHeaderRows;
if (!this.columns.isEmpty()) {
for (String column : this.columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
} else {
Preconditions.checkArgument(
this.findColumnsFromHeader,
"If columns field is not set, the first row of your data must have your header"
+ " and hasHeaderRow must be set to true."
);
}
}
@VisibleForTesting
public CsvInputFormat(
List<String> columns,
String listDelimiter,
boolean findColumnsFromHeader,
int skipHeaderRows
)
{
this(columns, listDelimiter, null, findColumnsFromHeader, skipHeaderRows);
}
@JsonProperty
public List<String> getColumns()
{
return columns;
}
@JsonProperty
public String getListDelimiter()
{
return listDelimiter;
}
@JsonProperty
public boolean isFindColumnsFromHeader()
{
return findColumnsFromHeader;
}
@JsonProperty
public int getSkipHeaderRows()
{
return skipHeaderRows;
}
@Override
public boolean isSplittable()
{
return true;
}
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new CsvReader(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
skipHeaderRows
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CsvInputFormat format = (CsvInputFormat) o;
return findColumnsFromHeader == format.findColumnsFromHeader &&
skipHeaderRows == format.skipHeaderRows &&
Objects.equals(listDelimiter, format.listDelimiter) &&
Objects.equals(columns, format.columns);
}
@Override
public int hashCode()
{
return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows);
super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.CSV);
}
}

View File

@ -19,49 +19,15 @@
package org.apache.druid.data.input.impl;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder;
import com.opencsv.enums.CSVReaderNullFieldIndicator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.TextReader;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.common.parsers.ParserUtils;
import org.apache.druid.java.util.common.parsers.Parsers;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class CsvReader extends TextReader
public class CsvReader extends SeparateValueReader
{
private final RFC4180Parser parser = CsvReader.createOpenCsvParser();
private final boolean findColumnsFromHeader;
private final int skipHeaderRows;
private final Function<String, Object> multiValueFunction;
@Nullable
private List<String> columns;
public static RFC4180Parser createOpenCsvParser()
{
return NullHandling.replaceWithDefault()
? new RFC4180Parser()
: new RFC4180ParserBuilder().withFieldAsNull(
CSVReaderNullFieldIndicator.EMPTY_SEPARATORS).build();
}
CsvReader(
InputRowSchema inputRowSchema,
InputEntity source,
@ -72,69 +38,15 @@ public class CsvReader extends TextReader
int skipHeaderRows
)
{
super(inputRowSchema, source, temporaryDirectory);
this.findColumnsFromHeader = findColumnsFromHeader;
this.skipHeaderRows = skipHeaderRows;
final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter));
this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row
if (this.columns != null) {
for (String column : this.columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
} else {
Preconditions.checkArgument(
super(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
"If columns field is not set, the first row of your data must have your header"
+ " and hasHeaderRow must be set to true."
skipHeaderRows,
SeparateValueInputFormat.Format.CSV
);
}
}
@Override
public List<InputRow> parseInputRows(String line) throws IOException, ParseException
{
final Map<String, Object> zipped = parseLine(line);
return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped));
}
@Override
public Map<String, Object> toMap(String intermediateRow) throws IOException
{
return parseLine(intermediateRow);
}
private Map<String, Object> parseLine(String line) throws IOException
{
final String[] parsed = parser.parseLine(line);
return Utils.zipMapPartial(
Preconditions.checkNotNull(columns, "columns"),
Iterables.transform(Arrays.asList(parsed), multiValueFunction)
);
}
@Override
public int getNumHeaderLinesToSkip()
{
return skipHeaderRows;
}
@Override
public boolean needsToProcessHeaderLine()
{
return findColumnsFromHeader;
}
@Override
public void processHeaderLine(String line) throws IOException
{
if (!findColumnsFromHeader) {
throw new ISE("Don't call this if findColumnsFromHeader = false");
}
columns = findOrCreateColumnNames(Arrays.asList(parser.parseLine(line)));
if (columns.isEmpty()) {
throw new ISE("Empty columns");
}
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* SeparateValueInputFormat abstracts the (Comma/Tab) Separate Value format of input data.
* It implements the common logic between {@link CsvInputFormat} and {@link TsvInputFormat}
* Should never be instantiated
*/
public abstract class SeparateValueInputFormat implements InputFormat
{
public enum Format
{
CSV(',', "comma"),
TSV('\t', "tab");
private final char delimiter;
private final String literal;
Format(char delimiter, String literal)
{
this.delimiter = delimiter;
this.literal = literal;
}
public String getDelimiterAsString()
{
return String.valueOf(delimiter);
}
public char getDelimiter()
{
return delimiter;
}
public String getLiteral()
{
return literal;
}
}
private final String listDelimiter;
private final List<String> columns;
private final boolean findColumnsFromHeader;
private final int skipHeaderRows;
private final Format format;
protected SeparateValueInputFormat(
@Nullable List<String> columns,
@Nullable String listDelimiter,
@Nullable Boolean hasHeaderRow,
@Nullable Boolean findColumnsFromHeader,
int skipHeaderRows,
Format format
)
{
this.listDelimiter = listDelimiter;
this.columns = columns == null ? Collections.emptyList() : columns;
//noinspection ConstantConditions
this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("hasHeaderRow", hasHeaderRow),
new Property<>("findColumnsFromHeader", findColumnsFromHeader)
)
).getValue();
this.skipHeaderRows = skipHeaderRows;
this.format = format;
if (!this.columns.isEmpty()) {
for (String column : this.columns) {
Preconditions.checkArgument(
!column.contains(format.getDelimiterAsString()),
"Column[%s] has a " + format.getLiteral() + ", it cannot",
column
);
}
} else {
Preconditions.checkArgument(
this.findColumnsFromHeader,
"If columns field is not set, the first row of your data must have your header"
+ " and hasHeaderRow must be set to true."
);
}
}
@JsonProperty
public List<String> getColumns()
{
return columns;
}
@JsonProperty
public String getListDelimiter()
{
return listDelimiter;
}
@JsonProperty
public boolean isFindColumnsFromHeader()
{
return findColumnsFromHeader;
}
@JsonProperty
public int getSkipHeaderRows()
{
return skipHeaderRows;
}
@Override
public boolean isSplittable()
{
return true;
}
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return this.format == Format.TSV ? new TsvReader(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
skipHeaderRows
) : new CsvReader(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
skipHeaderRows
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SeparateValueInputFormat format = (SeparateValueInputFormat) o;
return findColumnsFromHeader == format.findColumnsFromHeader &&
skipHeaderRows == format.skipHeaderRows &&
Objects.equals(listDelimiter, format.listDelimiter) &&
Objects.equals(columns, format.columns) &&
Objects.equals(this.format, format.format);
}
@Override
public int hashCode()
{
return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows, format);
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder;
import com.opencsv.enums.CSVReaderNullFieldIndicator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.TextReader;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.common.parsers.ParserUtils;
import org.apache.druid.java.util.common.parsers.Parsers;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* SeparateValueReader abstracts the reader for (Comma/Tab) Separate Value format input data.
* It implements the common logic between {@link CsvReader} and {@link TsvReader}
* Should never be instantiated
*/
public abstract class SeparateValueReader extends TextReader
{
private final boolean findColumnsFromHeader;
private final int skipHeaderRows;
private final Function<String, Object> multiValueFunction;
@Nullable
private List<String> columns;
private final SeparateValueInputFormat.Format format;
private final RFC4180Parser parser;
public static RFC4180Parser createOpenCsvParser(char separator)
{
return NullHandling.replaceWithDefault()
? new RFC4180ParserBuilder()
.withSeparator(separator)
.build()
: new RFC4180ParserBuilder().withFieldAsNull(
CSVReaderNullFieldIndicator.EMPTY_SEPARATORS)
.withSeparator(separator)
.build();
}
SeparateValueReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
@Nullable String listDelimiter,
@Nullable List<String> columns,
boolean findColumnsFromHeader,
int skipHeaderRows,
SeparateValueInputFormat.Format format
)
{
super(inputRowSchema, source, temporaryDirectory);
this.findColumnsFromHeader = findColumnsFromHeader;
this.skipHeaderRows = skipHeaderRows;
final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter));
this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row
this.format = format;
this.parser = createOpenCsvParser(format.getDelimiter());
if (this.columns != null) {
for (String column : this.columns) {
Preconditions.checkArgument(
!column.contains(format.getDelimiterAsString()),
"Column[%s] has a " + format.getLiteral() + ", it cannot",
column
);
}
} else {
Preconditions.checkArgument(
findColumnsFromHeader,
"If columns field is not set, the first row of your data must have your header"
+ " and hasHeaderRow must be set to true."
);
}
}
@Override
public List<InputRow> parseInputRows(String line) throws IOException, ParseException
{
final Map<String, Object> zipped = parseLine(line);
return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped));
}
@Override
public Map<String, Object> toMap(String intermediateRow) throws IOException
{
return parseLine(intermediateRow);
}
private Map<String, Object> parseLine(String line) throws IOException
{
final String[] parsed = parser.parseLine(line);
return Utils.zipMapPartial(
Preconditions.checkNotNull(columns, "columns"),
Iterables.transform(Arrays.asList(parsed), multiValueFunction)
);
}
@Override
public int getNumHeaderLinesToSkip()
{
return skipHeaderRows;
}
@Override
public boolean needsToProcessHeaderLine()
{
return findColumnsFromHeader;
}
@Override
public void processHeaderLine(String line) throws IOException
{
if (!findColumnsFromHeader) {
throw new ISE("Don't call this if findColumnsFromHeader = false");
}
columns = findOrCreateColumnNames(Arrays.asList(parser.parseLine(line)));
if (columns.isEmpty()) {
throw new ISE("Empty columns");
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import java.util.List;
public class TsvInputFormat extends SeparateValueInputFormat
{
@JsonCreator
public TsvInputFormat(
@JsonProperty("columns") @Nullable List<String> columns,
@JsonProperty("listDelimiter") @Nullable String listDelimiter,
@Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow,
@JsonProperty("findColumnsFromHeader") @Nullable Boolean findColumnsFromHeader,
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.TSV);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRowSchema;
import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
public class TsvReader extends SeparateValueReader
{
TsvReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
@Nullable String listDelimiter,
@Nullable List<String> columns,
boolean findColumnsFromHeader,
int skipHeaderRows
)
{
super(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
skipHeaderRows,
SeparateValueInputFormat.Format.TSV
);
}
}

View File

@ -21,7 +21,7 @@ package org.apache.druid.java.util.common.parsers;
import com.google.common.annotations.VisibleForTesting;
import com.opencsv.RFC4180Parser;
import org.apache.druid.data.input.impl.CsvReader;
import org.apache.druid.data.input.impl.SeparateValueReader;
import javax.annotation.Nullable;
import java.io.IOException;
@ -30,7 +30,7 @@ import java.util.List;
public class CSVParser extends AbstractFlatTextFormatParser
{
private final RFC4180Parser parser = CsvReader.createOpenCsvParser();
private final RFC4180Parser parser = SeparateValueReader.createOpenCsvParser(',');
public CSVParser(
@Nullable final String listDelimiter,

View File

@ -38,7 +38,7 @@ public class CsvInputFormatTest
public void testSerde() throws IOException
{
final ObjectMapper mapper = new ObjectMapper();
final CsvInputFormat format = new CsvInputFormat(Collections.singletonList("a"), "|", true, 10);
final CsvInputFormat format = new CsvInputFormat(Collections.singletonList("a"), "|", null, true, 10);
final byte[] bytes = mapper.writeValueAsBytes(format);
final CsvInputFormat fromJson = (CsvInputFormat) mapper.readValue(bytes, InputFormat.class);
Assert.assertEquals(format, fromJson);
@ -49,6 +49,6 @@ public class CsvInputFormatTest
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Column[a,] has a comma, it cannot");
new CsvInputFormat(Collections.singletonList("a,"), ",", false, 0);
new CsvInputFormat(Collections.singletonList("a,"), ",", null, false, 0);
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.data.input.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@ -30,6 +31,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
@ -48,6 +50,12 @@ public class CsvReaderTest
Collections.emptyList()
);
@BeforeClass
public static void setup()
{
NullHandling.initializeForTests();
}
@Test
public void testWithoutHeaders() throws IOException
{
@ -58,7 +66,7 @@ public class CsvReaderTest
"2019-01-01T00:00:30Z,name_3,15"
)
);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 0);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 0);
assertResult(source, format);
}
@ -73,7 +81,7 @@ public class CsvReaderTest
"2019-01-01T00:00:30Z,name_3,15"
)
);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, true, 0);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, null, true, 0);
assertResult(source, format);
}
@ -88,7 +96,7 @@ public class CsvReaderTest
"2019-01-01T00:00:30Z,name_3,15"
)
);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 1);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 1);
assertResult(source, format);
}
@ -104,7 +112,7 @@ public class CsvReaderTest
"2019-01-01T00:00:30Z,name_3,15"
)
);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, true, 1);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, null, true, 1);
assertResult(source, format);
}
@ -119,7 +127,7 @@ public class CsvReaderTest
"2019-01-01T00:00:30Z,name_3,15|3"
)
);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), "|", true, 0);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), "|", null, true, 0);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
int numResults = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
@ -210,7 +218,13 @@ public class CsvReaderTest
ImmutableMap.of("Value", "65", "Comment", "Here I write \\n slash n", "Timestamp", "2018-05-09T10:00:00Z")
)
);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("Value", "Comment", "Timestamp"), null, false, 0);
final CsvInputFormat format = new CsvInputFormat(
ImmutableList.of("Value", "Comment", "Timestamp"),
null,
null,
false,
0
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("Timestamp", "auto", null),
@ -238,7 +252,7 @@ public class CsvReaderTest
"2019-01-01T00:00:10Z,name_1,\"Как говорится: \\\"\"всё течет, всё изменяется\\\"\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева\""
)
);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, false, 0);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, null, false, 0);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(iterator.hasNext());

View File

@ -69,6 +69,7 @@ public class InputEntityIteratingReaderTest
new CsvInputFormat(
ImmutableList.of("time", "name", "score"),
null,
null,
false,
0
),

View File

@ -43,7 +43,7 @@ public class TimedShutoffInputSourceTest
new InlineInputSource("this,is,test\nthis,data,has\n3,rows,\n"),
DateTimes.nowUtc().plusMillis(timeoutMs)
);
final InputFormat inputFormat = new CsvInputFormat(ImmutableList.of("col1", "col2", "col3"), null, false, 0);
final InputFormat inputFormat = new CsvInputFormat(ImmutableList.of("col1", "col2", "col3"), null, null, false, 0);
final InputSourceReader reader = inputSource.reader(
new InputRowSchema(new TimestampSpec(null, null, null), new DimensionsSpec(null), Collections.emptyList()),
inputFormat,

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputFormat;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Collections;
public class TsvInputFormatTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerde() throws IOException
{
final ObjectMapper mapper = new ObjectMapper();
final TsvInputFormat format = new TsvInputFormat(Collections.singletonList("a"), "|", null, true, 10);
final byte[] bytes = mapper.writeValueAsBytes(format);
final TsvInputFormat fromJson = (TsvInputFormat) mapper.readValue(bytes, InputFormat.class);
Assert.assertEquals(format, fromJson);
}
@Test
public void testTab()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Column[a\t] has a tab, it cannot");
new TsvInputFormat(Collections.singletonList("a\t"), ",", null, false, 0);
}
}

View File

@ -0,0 +1,308 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
public class TsvReaderTest
{
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))),
Collections.emptyList()
);
@BeforeClass
public static void setup()
{
NullHandling.initializeForTests();
}
@Test
public void testWithoutHeaders() throws IOException
{
final ByteEntity source = writeData(
ImmutableList.of(
"2019-01-01T00:00:10Z\tname_1\t5",
"2019-01-01T00:00:20Z\tname_2\t10",
"2019-01-01T00:00:30Z\tname_3\t15"
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 0);
assertResult(source, format);
}
@Test
public void testFindColumn() throws IOException
{
final ByteEntity source = writeData(
ImmutableList.of(
"ts\tname\tscore",
"2019-01-01T00:00:10Z\tname_1\t5",
"2019-01-01T00:00:20Z\tname_2\t10",
"2019-01-01T00:00:30Z\tname_3\t15"
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, null, true, 0);
assertResult(source, format);
}
@Test
public void testSkipHeaders() throws IOException
{
final ByteEntity source = writeData(
ImmutableList.of(
"this\tis\ta\trow\tto\tskip",
"2019-01-01T00:00:10Z\tname_1\t5",
"2019-01-01T00:00:20Z\tname_2\t10",
"2019-01-01T00:00:30Z\tname_3\t15"
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 1);
assertResult(source, format);
}
@Test
public void testFindColumnAndSkipHeaders() throws IOException
{
final ByteEntity source = writeData(
ImmutableList.of(
"this\tis\ta\trow\tto\tskip",
"ts\tname\tscore",
"2019-01-01T00:00:10Z\tname_1\t5",
"2019-01-01T00:00:20Z\tname_2\t10",
"2019-01-01T00:00:30Z\tname_3\t15"
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, null, true, 1);
assertResult(source, format);
}
@Test
public void testMultiValues() throws IOException
{
final ByteEntity source = writeData(
ImmutableList.of(
"ts\tname\tscore",
"2019-01-01T00:00:10Z\tname_1\t5|1",
"2019-01-01T00:00:20Z\tname_2\t10|2",
"2019-01-01T00:00:30Z\tname_3\t15|3"
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), "|", null, true, 0);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
int numResults = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(
DateTimes.of(StringUtils.format("2019-01-01T00:00:%02dZ", (numResults + 1) * 10)),
row.getTimestamp()
);
Assert.assertEquals(
StringUtils.format("name_%d", numResults + 1),
Iterables.getOnlyElement(row.getDimension("name"))
);
Assert.assertEquals(
ImmutableList.of(Integer.toString((numResults + 1) * 5), Integer.toString(numResults + 1)),
row.getDimension("score")
);
numResults++;
}
Assert.assertEquals(3, numResults);
}
}
@Test
public void testQuotes() throws IOException
{
final ByteEntity source = writeData(
ImmutableList.of(
"3\t\"Lets do some \"\"normal\"\" quotes\"\t2018-05-05T10:00:00Z",
"34\t\"Lets do some \"\"normal\"\", quotes with comma\"\t2018-05-06T10:00:00Z",
"343\t\"Lets try \\\"\"it\\\"\" with slash quotes\"\t2018-05-07T10:00:00Z",
"545\t\"Lets try \\\"\"it\\\"\", with slash quotes and comma\"\t2018-05-08T10:00:00Z",
"65\tHere I write \\n slash n\t2018-05-09T10:00:00Z"
)
);
final List<InputRow> expectedResults = ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2018-05-05T10:00:00Z"),
ImmutableList.of("Timestamp"),
ImmutableMap.of(
"Value",
"3",
"Comment",
"Lets do some \"normal\" quotes",
"Timestamp",
"2018-05-05T10:00:00Z"
)
),
new MapBasedInputRow(
DateTimes.of("2018-05-06T10:00:00Z"),
ImmutableList.of("Timestamp"),
ImmutableMap.of(
"Value",
"34",
"Comment",
"Lets do some \"normal\", quotes with comma",
"Timestamp",
"2018-05-06T10:00:00Z"
)
),
new MapBasedInputRow(
DateTimes.of("2018-05-07T10:00:00Z"),
ImmutableList.of("Timestamp"),
ImmutableMap.of(
"Value",
"343",
"Comment",
"Lets try \\\"it\\\" with slash quotes",
"Timestamp",
"2018-05-07T10:00:00Z"
)
),
new MapBasedInputRow(
DateTimes.of("2018-05-08T10:00:00Z"),
ImmutableList.of("Timestamp"),
ImmutableMap.of(
"Value",
"545",
"Comment",
"Lets try \\\"it\\\", with slash quotes and comma",
"Timestamp",
"2018-05-08T10:00:00Z"
)
),
new MapBasedInputRow(
DateTimes.of("2018-05-09T10:00:00Z"),
ImmutableList.of("Timestamp"),
ImmutableMap.of("Value", "65", "Comment", "Here I write \\n slash n", "Timestamp", "2018-05-09T10:00:00Z")
)
);
final TsvInputFormat format = new TsvInputFormat(
ImmutableList.of("Value", "Comment", "Timestamp"),
null,
null,
false,
0
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("Timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("Timestamp"))),
Collections.emptyList()
),
source,
null
);
try (CloseableIterator<InputRow> iterator = reader.read()) {
final Iterator<InputRow> expectedRowIterator = expectedResults.iterator();
while (iterator.hasNext()) {
Assert.assertTrue(expectedRowIterator.hasNext());
Assert.assertEquals(expectedRowIterator.next(), iterator.next());
}
}
}
@Test
public void testRussianTextMess() throws IOException
{
final ByteEntity source = writeData(
ImmutableList.of(
"2019-01-01T00:00:10Z\tname_1\t\"Как говорится: \\\"\"всё течет\t всё изменяется\\\"\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева\""
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, null, false, 0);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next();
Assert.assertEquals(DateTimes.of("2019-01-01T00:00:10Z"), row.getTimestamp());
Assert.assertEquals("name_1", Iterables.getOnlyElement(row.getDimension("name")));
Assert.assertEquals(
"Как говорится: \\\"всё течет\t всё изменяется\\\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева",
Iterables.getOnlyElement(row.getDimension("Comment"))
);
Assert.assertFalse(iterator.hasNext());
}
}
private ByteEntity writeData(List<String> lines) throws IOException
{
final List<byte[]> byteLines = lines.stream()
.map(line -> StringUtils.toUtf8(line + "\n"))
.collect(Collectors.toList());
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(
byteLines.stream().mapToInt(bytes -> bytes.length).sum()
);
for (byte[] bytes : byteLines) {
outputStream.write(bytes);
}
return new ByteEntity(outputStream.toByteArray());
}
private void assertResult(ByteEntity source, TsvInputFormat format) throws IOException
{
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
int numResults = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(
DateTimes.of(StringUtils.format("2019-01-01T00:00:%02dZ", (numResults + 1) * 10)),
row.getTimestamp()
);
Assert.assertEquals(
StringUtils.format("name_%d", numResults + 1),
Iterables.getOnlyElement(row.getDimension("name"))
);
Assert.assertEquals(
Integer.toString((numResults + 1) * 5),
Iterables.getOnlyElement(row.getDimension("score"))
);
numResults++;
}
Assert.assertEquals(3, numResults);
}
}
}

View File

@ -217,7 +217,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
private final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
null,
new LocalInputSource(new File("tmp"), "test_*"),
new CsvInputFormat(Arrays.asList("ts", "dim", "val"), null, false, 0),
new CsvInputFormat(Arrays.asList("ts", "dim", "val"), null, null, false, 0),
false
);

View File

@ -58,7 +58,7 @@ public class CsvInputSourceSamplerTest
"Michael,Jackson,,Male"
);
final InputSource inputSource = new InlineInputSource(String.join("\n", strCsvRows));
final InputFormat inputFormat = new CsvInputFormat(null, null, true, 0);
final InputFormat inputFormat = new CsvInputFormat(null, null, null, true, 0);
final InputSourceSampler inputSourceSampler = new InputSourceSampler();
final SamplerResponse response = inputSourceSampler.sample(

View File

@ -1090,7 +1090,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
case STR_JSON:
return new JsonInputFormat(null, null);
case STR_CSV:
return new CsvInputFormat(ImmutableList.of("t", "dim1", "dim2", "met1"), null, false, 0);
return new CsvInputFormat(ImmutableList.of("t", "dim1", "dim2", "met1"), null, null, false, 0);
default:
throw new IAE("Unknown parser type: %s", parserType);
}

View File

@ -71,7 +71,7 @@ public class RecordSupplierInputSourceTest
final List<String> colNames = IntStream.range(0, NUM_COLS)
.mapToObj(i -> StringUtils.format("col_%d", i))
.collect(Collectors.toList());
final InputFormat inputFormat = new CsvInputFormat(colNames, null, false, 0);
final InputFormat inputFormat = new CsvInputFormat(colNames, null, null, false, 0);
final InputSourceReader reader = inputSource.reader(
new InputRowSchema(
new TimestampSpec("col_0", "auto", null),