refactor InputFormat and InputEntityReader implementations (#8875)

* refactor InputFormat and InputReader to supply InputEntity and temp dir to constructors instead of read/sample

* fix style
This commit is contained in:
Clint Wylie 2019-11-15 17:08:26 -08:00 committed by GitHub
parent 1611792855
commit 7fa3182fe5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 66 additions and 35 deletions

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.druid.guice.annotations.UnstableApi; import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import java.io.File;
import java.io.IOException; import java.io.IOException;
/** /**
@ -44,7 +43,7 @@ public interface InputEntityReader
*/ */
ObjectWriter DEFAULT_JSON_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter(); ObjectWriter DEFAULT_JSON_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter();
CloseableIterator<InputRow> read(InputEntity source, File temporaryDirectory) throws IOException; CloseableIterator<InputRow> read() throws IOException;
CloseableIterator<InputRowListPlusJson> sample(InputEntity source, File temporaryDirectory) throws IOException; CloseableIterator<InputRowListPlusJson> sample() throws IOException;
} }

View File

@ -29,6 +29,8 @@ import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.annotations.UnstableApi; import org.apache.druid.guice.annotations.UnstableApi;
import java.io.File;
/** /**
* InputFormat abstracts the file format of input data. * InputFormat abstracts the file format of input data.
* It creates a {@link InputEntityReader} to read data and parse it into {@link InputRow}. * It creates a {@link InputEntityReader} to read data and parse it into {@link InputRow}.
@ -53,5 +55,5 @@ public interface InputFormat
@JsonIgnore @JsonIgnore
boolean isSplittable(); boolean isSplittable();
InputEntityReader createReader(InputRowSchema inputRowSchema); InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory);
} }

View File

@ -23,7 +23,6 @@ import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -37,9 +36,9 @@ import java.util.List;
public abstract class IntermediateRowParsingReader<T> implements InputEntityReader public abstract class IntermediateRowParsingReader<T> implements InputEntityReader
{ {
@Override @Override
public CloseableIterator<InputRow> read(InputEntity source, File temporaryDirectory) throws IOException public CloseableIterator<InputRow> read() throws IOException
{ {
return intermediateRowIterator(source, temporaryDirectory).flatMap(row -> { return intermediateRowIterator().flatMap(row -> {
try { try {
// since parseInputRows() returns a list, the below line always iterates over the list, // since parseInputRows() returns a list, the below line always iterates over the list,
// which means it calls Iterator.hasNext() and Iterator.next() at least once per row. // which means it calls Iterator.hasNext() and Iterator.next() at least once per row.
@ -56,10 +55,10 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
} }
@Override @Override
public CloseableIterator<InputRowListPlusJson> sample(InputEntity source, File temporaryDirectory) public CloseableIterator<InputRowListPlusJson> sample()
throws IOException throws IOException
{ {
return intermediateRowIterator(source, temporaryDirectory).map(row -> { return intermediateRowIterator().map(row -> {
final String json; final String json;
try { try {
json = toJson(row); json = toJson(row);
@ -83,7 +82,7 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
* Creates an iterator of intermediate rows. The returned rows will be consumed by {@link #parseInputRows} and * Creates an iterator of intermediate rows. The returned rows will be consumed by {@link #parseInputRows} and
* {@link #toJson}. * {@link #toJson}.
*/ */
protected abstract CloseableIterator<T> intermediateRowIterator(InputEntity source, File temporaryDirectory) protected abstract CloseableIterator<T> intermediateRowIterator()
throws IOException; throws IOException;
/** /**

View File

@ -38,10 +38,14 @@ import java.util.List;
public abstract class TextReader extends IntermediateRowParsingReader<String> public abstract class TextReader extends IntermediateRowParsingReader<String>
{ {
private final InputRowSchema inputRowSchema; private final InputRowSchema inputRowSchema;
final InputEntity source;
final File temporaryDirectory;
public TextReader(InputRowSchema inputRowSchema) public TextReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{ {
this.inputRowSchema = inputRowSchema; this.inputRowSchema = inputRowSchema;
this.source = source;
this.temporaryDirectory = temporaryDirectory;
} }
public InputRowSchema getInputRowSchema() public InputRowSchema getInputRowSchema()
@ -50,7 +54,7 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
} }
@Override @Override
public CloseableIterator<String> intermediateRowIterator(InputEntity source, File temporaryDirectory) public CloseableIterator<String> intermediateRowIterator()
throws IOException throws IOException
{ {
final LineIterator delegate = new LineIterator( final LineIterator delegate = new LineIterator(

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
@ -31,6 +32,7 @@ import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property; import org.apache.druid.indexer.Property;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -117,10 +119,12 @@ public class CsvInputFormat implements InputFormat
} }
@Override @Override
public InputEntityReader createReader(InputRowSchema inputRowSchema) public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{ {
return new CsvReader( return new CsvReader(
inputRowSchema, inputRowSchema,
source,
temporaryDirectory,
listDelimiter, listDelimiter,
columns, columns,
findColumnsFromHeader, findColumnsFromHeader,

View File

@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.opencsv.RFC4180Parser; import com.opencsv.RFC4180Parser;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.TextReader; import org.apache.druid.data.input.TextReader;
@ -34,6 +35,7 @@ import org.apache.druid.java.util.common.parsers.ParserUtils;
import org.apache.druid.java.util.common.parsers.Parsers; import org.apache.druid.java.util.common.parsers.Parsers;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -51,13 +53,15 @@ public class CsvReader extends TextReader
CsvReader( CsvReader(
InputRowSchema inputRowSchema, InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
@Nullable String listDelimiter, @Nullable String listDelimiter,
@Nullable List<String> columns, @Nullable List<String> columns,
boolean findColumnsFromHeader, boolean findColumnsFromHeader,
int skipHeaderRows int skipHeaderRows
) )
{ {
super(inputRowSchema); super(inputRowSchema, source, temporaryDirectory);
this.findColumnsFromHeader = findColumnsFromHeader; this.findColumnsFromHeader = findColumnsFromHeader;
this.skipHeaderRows = skipHeaderRows; this.skipHeaderRows = skipHeaderRows;
final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;

View File

@ -64,9 +64,9 @@ public class InputEntityIteratingReader implements InputSourceReader
{ {
return createIterator(entity -> { return createIterator(entity -> {
// InputEntityReader is stateful and so a new one should be created per entity. // InputEntityReader is stateful and so a new one should be created per entity.
final InputEntityReader reader = inputFormat.createReader(inputRowSchema); final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
try { try {
return reader.read(entity, temporaryDirectory); return reader.read();
} }
catch (IOException e) { catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -79,9 +79,9 @@ public class InputEntityIteratingReader implements InputSourceReader
{ {
return createIterator(entity -> { return createIterator(entity -> {
// InputEntityReader is stateful and so a new one should be created per entity. // InputEntityReader is stateful and so a new one should be created per entity.
final InputEntityReader reader = inputFormat.createReader(inputRowSchema); final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
try { try {
return reader.sample(entity, temporaryDirectory); return reader.sample();
} }
catch (IOException e) { catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -23,11 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParser.Feature; import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -66,9 +68,9 @@ public class JsonInputFormat extends NestedInputFormat
} }
@Override @Override
public InputEntityReader createReader(InputRowSchema inputRowSchema) public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{ {
return new JsonReader(inputRowSchema, getFlattenSpec(), objectMapper); return new JsonReader(inputRowSchema, source, temporaryDirectory, getFlattenSpec(), objectMapper);
} }
@Override @Override

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.TextReader; import org.apache.druid.data.input.TextReader;
@ -30,6 +31,7 @@ import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -40,9 +42,15 @@ public class JsonReader extends TextReader
private final ObjectFlattener<JsonNode> flattener; private final ObjectFlattener<JsonNode> flattener;
private final ObjectMapper mapper; private final ObjectMapper mapper;
JsonReader(InputRowSchema inputRowSchema, JSONPathSpec flattenSpec, ObjectMapper mapper) JsonReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
JSONPathSpec flattenSpec,
ObjectMapper mapper
)
{ {
super(inputRowSchema); super(inputRowSchema, source, temporaryDirectory);
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker()); this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker());
this.mapper = mapper; this.mapper = mapper;
} }

View File

@ -120,9 +120,9 @@ public class CsvReaderTest
) )
); );
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), "|", true, 0); final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), "|", true, 0);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA); final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
int numResults = 0; int numResults = 0;
try (CloseableIterator<InputRow> iterator = reader.read(source, null)) { try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) { while (iterator.hasNext()) {
final InputRow row = iterator.next(); final InputRow row = iterator.next();
Assert.assertEquals( Assert.assertEquals(
@ -216,10 +216,12 @@ public class CsvReaderTest
new TimestampSpec("Timestamp", "auto", null), new TimestampSpec("Timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("Timestamp"))), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("Timestamp"))),
Collections.emptyList() Collections.emptyList()
) ),
source,
null
); );
try (CloseableIterator<InputRow> iterator = reader.read(source, null)) { try (CloseableIterator<InputRow> iterator = reader.read()) {
final Iterator<InputRow> expectedRowIterator = expectedResults.iterator(); final Iterator<InputRow> expectedRowIterator = expectedResults.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Assert.assertTrue(expectedRowIterator.hasNext()); Assert.assertTrue(expectedRowIterator.hasNext());
@ -237,8 +239,8 @@ public class CsvReaderTest
) )
); );
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, false, 0); final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, false, 0);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA); final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
try (CloseableIterator<InputRow> iterator = reader.read(source, null)) { try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(iterator.hasNext()); Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next(); final InputRow row = iterator.next();
Assert.assertEquals(DateTimes.of("2019-01-01T00:00:10Z"), row.getTimestamp()); Assert.assertEquals(DateTimes.of("2019-01-01T00:00:10Z"), row.getTimestamp());
@ -267,9 +269,9 @@ public class CsvReaderTest
private void assertResult(ByteEntity source, CsvInputFormat format) throws IOException private void assertResult(ByteEntity source, CsvInputFormat format) throws IOException
{ {
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA); final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
int numResults = 0; int numResults = 0;
try (CloseableIterator<InputRow> iterator = reader.read(source, null)) { try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) { while (iterator.hasNext()) {
final InputRow row = iterator.next(); final InputRow row = iterator.next();
Assert.assertEquals( Assert.assertEquals(

View File

@ -65,10 +65,12 @@ public class JsonReaderTest
new TimestampSpec("timestamp", "iso", null), new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList() Collections.emptyList()
) ),
source,
null
); );
final int numExpectedIterations = 1; final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read(source, null)) { try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0; int numActualIterations = 0;
while (iterator.hasNext()) { while (iterator.hasNext()) {
final InputRow row = iterator.next(); final InputRow row = iterator.next();
@ -112,11 +114,13 @@ public class JsonReaderTest
new TimestampSpec("timestamp", "iso", null), new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))),
Collections.emptyList() Collections.emptyList()
) ),
source,
null
); );
final int numExpectedIterations = 1; final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read(source, null)) { try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0; int numActualIterations = 0;
while (iterator.hasNext()) { while (iterator.hasNext()) {
final InputRow row = iterator.next(); final InputRow row = iterator.next();

View File

@ -19,10 +19,13 @@
package org.apache.druid.data.input.impl; package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
import java.io.File;
public class NoopInputFormat implements InputFormat public class NoopInputFormat implements InputFormat
{ {
@Override @Override
@ -32,7 +35,7 @@ public class NoopInputFormat implements InputFormat
} }
@Override @Override
public InputEntityReader createReader(InputRowSchema inputRowSchema) public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }