add customize separator for TSV inputFormat (#8993)

* add customize separator for TSV inputFormat

* fix spotbug

* code refactor

* code refactor

* add argument check for delimiter

* refine null check

* add check for delimiter and listdelimiter can not be same

* add unit tests
This commit is contained in:
Rye 2019-12-09 11:24:09 -08:00 committed by Jonathan Wei
parent 1c62987783
commit ca77d576c6
12 changed files with 177 additions and 205 deletions

View File

@ -24,11 +24,11 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DelimitedInputFormat;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.TsvInputFormat;
import org.apache.druid.guice.annotations.UnstableApi;
import java.io.File;
@ -38,7 +38,7 @@ import java.io.IOException;
* InputFormat abstracts the file format of input data.
* It creates a {@link InputEntityReader} to read data and parse it into {@link InputRow}.
* The created InputEntityReader is used by {@link InputSourceReader}.
*
* <p>
* See {@link NestedInputFormat} for nested input formats such as JSON.
*/
@UnstableApi
@ -47,13 +47,13 @@ import java.io.IOException;
@Type(name = "csv", value = CsvInputFormat.class),
@Type(name = "json", value = JsonInputFormat.class),
@Type(name = "regex", value = RegexInputFormat.class),
@Type(name = "tsv", value = TsvInputFormat.class)
@Type(name = "tsv", value = DelimitedInputFormat.class)
})
public interface InputFormat
{
/**
* Trait to indicate that a file can be split into multiple {@link InputSplit}s.
*
* <p>
* This method is not being used anywhere for now, but should be considered
* in {@link SplittableInputSource#createSplits} in the future.
*/

View File

@ -30,7 +30,7 @@ import java.util.NoSuchElementException;
/**
* {@link InputEntityReader} that parses bytes into some intermediate rows first, and then into {@link InputRow}s.
* For example, {@link org.apache.druid.data.input.impl.CsvReader} parses bytes into string lines, and then parses
* For example, {@link org.apache.druid.data.input.impl.DelimitedValueReader} parses bytes into string lines, and then parses
* those lines into InputRows.
*
* @param <T> type of intermediate row. For example, it can be {@link String} for text formats.

View File

@ -25,7 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import java.util.List;
public class CsvInputFormat extends SeparateValueInputFormat
public class CsvInputFormat extends DelimitedInputFormat
{
@JsonCreator
public CsvInputFormat(
@ -36,6 +36,6 @@ public class CsvInputFormat extends SeparateValueInputFormat
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.CSV);
super(columns, listDelimiter, ",", hasHeaderRow, findColumnsFromHeader, skipHeaderRows);
}
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRowSchema;
import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
public class CsvReader extends SeparateValueReader
{
CsvReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
@Nullable String listDelimiter,
@Nullable List<String> columns,
boolean findColumnsFromHeader,
int skipHeaderRows
)
{
super(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
skipHeaderRows,
SeparateValueInputFormat.Format.CSV
);
}
}

View File

@ -36,20 +36,19 @@ import java.util.List;
import java.util.Objects;
/**
* SeparateValueInputFormat abstracts the (Comma/Tab) Separate Value format of input data.
* It implements the common logic between {@link CsvInputFormat} and {@link TsvInputFormat}
* Should never be instantiated
* InputFormat for customized Delimitor Separate Value format of input data(default is TSV).
*/
public abstract class SeparateValueInputFormat implements InputFormat
public class DelimitedInputFormat implements InputFormat
{
public enum Format
{
CSV(',', "comma"),
TSV('\t', "tab");
TSV('\t', "tab"),
CustomizeSV('|', "");
private final char delimiter;
private final String literal;
private char delimiter;
private String literal;
Format(char delimiter, String literal)
{
@ -62,6 +61,12 @@ public abstract class SeparateValueInputFormat implements InputFormat
return String.valueOf(delimiter);
}
private void setDelimiter(String delimiter, String literal)
{
this.delimiter = (delimiter != null && delimiter.length() > 0) ? delimiter.charAt(0) : '\t';
this.literal = literal != null ? literal : "customize separator: " + delimiter;
}
public char getDelimiter()
{
return delimiter;
@ -78,14 +83,15 @@ public abstract class SeparateValueInputFormat implements InputFormat
private final boolean findColumnsFromHeader;
private final int skipHeaderRows;
private final Format format;
private final String delimiter;
protected SeparateValueInputFormat(
@Nullable List<String> columns,
@Nullable String listDelimiter,
@Nullable Boolean hasHeaderRow,
@Nullable Boolean findColumnsFromHeader,
int skipHeaderRows,
Format format
public DelimitedInputFormat(
@JsonProperty("columns") @Nullable List<String> columns,
@JsonProperty("listDelimiter") @Nullable String listDelimiter,
@JsonProperty("delimiter") @Nullable String delimiter,
@Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow,
@JsonProperty("findColumnsFromHeader") @Nullable Boolean findColumnsFromHeader,
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
this.listDelimiter = listDelimiter;
@ -98,8 +104,17 @@ public abstract class SeparateValueInputFormat implements InputFormat
)
).getValue();
this.skipHeaderRows = skipHeaderRows;
this.format = format;
this.delimiter = delimiter == null ? "\t" : delimiter;
this.format = getFormat(this.delimiter);
Preconditions.checkArgument(
this.delimiter.length() == 1,
"The delimiter should be a single character"
);
Preconditions.checkArgument(
!this.delimiter.equals(listDelimiter),
"Cannot have same delimiter and list delimiter of [%s]",
this.delimiter
);
if (!this.columns.isEmpty()) {
for (String column : this.columns) {
Preconditions.checkArgument(
@ -117,6 +132,18 @@ public abstract class SeparateValueInputFormat implements InputFormat
}
}
private static Format getFormat(String delimiter)
{
if (",".equals(delimiter)) {
return Format.CSV;
} else if ("\t".equals(delimiter)) {
return Format.TSV;
} else {
Format.CustomizeSV.setDelimiter(delimiter, null);
return Format.CustomizeSV;
}
}
@JsonProperty
public List<String> getColumns()
@ -151,22 +178,15 @@ public abstract class SeparateValueInputFormat implements InputFormat
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return this.format == Format.TSV ? new TsvReader(
return new DelimitedValueReader(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
skipHeaderRows
) : new CsvReader(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
skipHeaderRows
skipHeaderRows,
this.format
);
}
@ -179,7 +199,7 @@ public abstract class SeparateValueInputFormat implements InputFormat
if (o == null || getClass() != o.getClass()) {
return false;
}
SeparateValueInputFormat format = (SeparateValueInputFormat) o;
DelimitedInputFormat format = (DelimitedInputFormat) o;
return findColumnsFromHeader == format.findColumnsFromHeader &&
skipHeaderRows == format.skipHeaderRows &&
Objects.equals(listDelimiter, format.listDelimiter) &&

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Iterables;
import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder;
import com.opencsv.enums.CSVReaderNullFieldIndicator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@ -45,29 +46,30 @@ import java.util.List;
import java.util.Map;
/**
* SeparateValueReader abstracts the reader for (Comma/Tab) Separate Value format input data.
* It implements the common logic between {@link CsvReader} and {@link TsvReader}
* Should never be instantiated
* DelimitedValueReader is the reader for Delimitor Separate Value format input data(CSV/TSV).
*/
public abstract class SeparateValueReader extends TextReader
public class DelimitedValueReader extends TextReader
{
private final boolean findColumnsFromHeader;
private final int skipHeaderRows;
private final Function<String, Object> multiValueFunction;
@Nullable
private List<String> columns;
private final SeparateValueInputFormat.Format format;
private final RFC4180Parser parser;
public static RFC4180Parser createOpenCsvParser(char separator)
{
return new RFC4180ParserBuilder().withFieldAsNull(
return NullHandling.replaceWithDefault()
? new RFC4180ParserBuilder()
.withSeparator(separator)
.build()
: new RFC4180ParserBuilder().withFieldAsNull(
CSVReaderNullFieldIndicator.EMPTY_SEPARATORS)
.withSeparator(separator)
.build();
}
SeparateValueReader(
DelimitedValueReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
@ -75,7 +77,7 @@ public abstract class SeparateValueReader extends TextReader
@Nullable List<String> columns,
boolean findColumnsFromHeader,
int skipHeaderRows,
SeparateValueInputFormat.Format format
DelimitedInputFormat.Format format
)
{
super(inputRowSchema, source, temporaryDirectory);
@ -84,7 +86,6 @@ public abstract class SeparateValueReader extends TextReader
final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter));
this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row
this.format = format;
this.parser = createOpenCsvParser(format.getDelimiter());
if (this.columns != null) {

View File

@ -1,41 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import java.util.List;
public class TsvInputFormat extends SeparateValueInputFormat
{
@JsonCreator
public TsvInputFormat(
@JsonProperty("columns") @Nullable List<String> columns,
@JsonProperty("listDelimiter") @Nullable String listDelimiter,
@Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow,
@JsonProperty("findColumnsFromHeader") @Nullable Boolean findColumnsFromHeader,
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.TSV);
}
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRowSchema;
import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
public class TsvReader extends SeparateValueReader
{
TsvReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
@Nullable String listDelimiter,
@Nullable List<String> columns,
boolean findColumnsFromHeader,
int skipHeaderRows
)
{
super(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
skipHeaderRows,
SeparateValueInputFormat.Format.TSV
);
}
}

View File

@ -21,7 +21,7 @@ package org.apache.druid.java.util.common.parsers;
import com.google.common.annotations.VisibleForTesting;
import com.opencsv.RFC4180Parser;
import org.apache.druid.data.input.impl.SeparateValueReader;
import org.apache.druid.data.input.impl.DelimitedValueReader;
import javax.annotation.Nullable;
import java.io.IOException;
@ -30,7 +30,7 @@ import java.util.List;
public class CSVParser extends AbstractFlatTextFormatParser
{
private final RFC4180Parser parser = SeparateValueReader.createOpenCsvParser(',');
private final RFC4180Parser parser = DelimitedValueReader.createOpenCsvParser(',');
public CSVParser(
@Nullable final String listDelimiter,

View File

@ -49,6 +49,14 @@ public class CsvInputFormatTest
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Column[a,] has a comma, it cannot");
new CsvInputFormat(Collections.singletonList("a,"), ",", null, false, 0);
new CsvInputFormat(Collections.singletonList("a,"), "|", null, false, 0);
}
@Test
public void testDelimiter()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Cannot have same delimiter and list delimiter of [,]");
new CsvInputFormat(Collections.singletonList("a\t"), ",", null, false, 0);
}
}

View File

@ -29,18 +29,25 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Collections;
public class TsvInputFormatTest
public class DelimitedInputFormatTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
public final ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerde() throws IOException
{
final ObjectMapper mapper = new ObjectMapper();
final TsvInputFormat format = new TsvInputFormat(Collections.singletonList("a"), "|", null, true, 10);
final DelimitedInputFormat format = new DelimitedInputFormat(
Collections.singletonList("a"),
"|",
null,
null,
true,
10
);
final byte[] bytes = mapper.writeValueAsBytes(format);
final TsvInputFormat fromJson = (TsvInputFormat) mapper.readValue(bytes, InputFormat.class);
final DelimitedInputFormat fromJson = (DelimitedInputFormat) mapper.readValue(bytes, InputFormat.class);
Assert.assertEquals(format, fromJson);
}
@ -49,6 +56,30 @@ public class TsvInputFormatTest
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Column[a\t] has a tab, it cannot");
new TsvInputFormat(Collections.singletonList("a\t"), ",", null, false, 0);
new DelimitedInputFormat(Collections.singletonList("a\t"), ",", null, null, false, 0);
}
@Test
public void testDelimiterLength()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("The delimiter should be a single character");
new DelimitedInputFormat(Collections.singletonList("a\t"), ",", "null", null, false, 0);
}
@Test
public void testDelimiterAndListDelimiter()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Cannot have same delimiter and list delimiter of [,]");
new DelimitedInputFormat(Collections.singletonList("a\t"), ",", ",", null, false, 0);
}
@Test
public void testCustomizeSeparator()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Column[a|] has a customize separator: |, it cannot");
new DelimitedInputFormat(Collections.singletonList("a|"), ",", "|", null, false, 0);
}
}

View File

@ -42,7 +42,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
public class TsvReaderTest
public class DelimitedReaderTest
{
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
@ -66,7 +66,14 @@ public class TsvReaderTest
"2019-01-01T00:00:30Z\tname_3\t15"
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 0);
final DelimitedInputFormat format = new DelimitedInputFormat(
ImmutableList.of("ts", "name", "score"),
null,
null,
null,
false,
0
);
assertResult(source, format);
}
@ -81,7 +88,7 @@ public class TsvReaderTest
"2019-01-01T00:00:30Z\tname_3\t15"
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, null, true, 0);
final DelimitedInputFormat format = new DelimitedInputFormat(ImmutableList.of(), null, null, null, true, 0);
assertResult(source, format);
}
@ -96,7 +103,14 @@ public class TsvReaderTest
"2019-01-01T00:00:30Z\tname_3\t15"
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 1);
final DelimitedInputFormat format = new DelimitedInputFormat(
ImmutableList.of("ts", "name", "score"),
null,
null,
null,
false,
1
);
assertResult(source, format);
}
@ -112,7 +126,7 @@ public class TsvReaderTest
"2019-01-01T00:00:30Z\tname_3\t15"
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, null, true, 1);
final DelimitedInputFormat format = new DelimitedInputFormat(ImmutableList.of(), null, null, null, true, 1);
assertResult(source, format);
}
@ -127,7 +141,42 @@ public class TsvReaderTest
"2019-01-01T00:00:30Z\tname_3\t15|3"
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), "|", null, true, 0);
final DelimitedInputFormat format = new DelimitedInputFormat(ImmutableList.of(), "|", null, null, true, 0);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
int numResults = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(
DateTimes.of(StringUtils.format("2019-01-01T00:00:%02dZ", (numResults + 1) * 10)),
row.getTimestamp()
);
Assert.assertEquals(
StringUtils.format("name_%d", numResults + 1),
Iterables.getOnlyElement(row.getDimension("name"))
);
Assert.assertEquals(
ImmutableList.of(Integer.toString((numResults + 1) * 5), Integer.toString(numResults + 1)),
row.getDimension("score")
);
numResults++;
}
Assert.assertEquals(3, numResults);
}
}
@Test
public void testCustomizeSeparator() throws IOException
{
final ByteEntity source = writeData(
ImmutableList.of(
"ts|name|score",
"2019-01-01T00:00:10Z|name_1|5\t1",
"2019-01-01T00:00:20Z|name_2|10\t2",
"2019-01-01T00:00:30Z|name_3|15\t3"
)
);
final DelimitedInputFormat format = new DelimitedInputFormat(ImmutableList.of(), "\t", "|", null, true, 0);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
int numResults = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
@ -218,10 +267,11 @@ public class TsvReaderTest
ImmutableMap.of("Value", "65", "Comment", "Here I write \\n slash n", "Timestamp", "2018-05-09T10:00:00Z")
)
);
final TsvInputFormat format = new TsvInputFormat(
final DelimitedInputFormat format = new DelimitedInputFormat(
ImmutableList.of("Value", "Comment", "Timestamp"),
null,
null,
null,
false,
0
);
@ -252,7 +302,14 @@ public class TsvReaderTest
"2019-01-01T00:00:10Z\tname_1\t\"Как говорится: \\\"\"всё течет\t всё изменяется\\\"\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева\""
)
);
final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, null, false, 0);
final DelimitedInputFormat format = new DelimitedInputFormat(
ImmutableList.of("ts", "name", "Comment"),
null,
null,
null,
false,
0
);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(iterator.hasNext());
@ -281,7 +338,7 @@ public class TsvReaderTest
return new ByteEntity(outputStream.toByteArray());
}
private void assertResult(ByteEntity source, TsvInputFormat format) throws IOException
private void assertResult(ByteEntity source, DelimitedInputFormat format) throws IOException
{
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
int numResults = 0;