diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntityReader.java b/core/src/main/java/org/apache/druid/data/input/InputEntityReader.java index fbef80506e8..2dd351c898a 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputEntityReader.java +++ b/core/src/main/java/org/apache/druid/data/input/InputEntityReader.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectWriter; import org.apache.druid.guice.annotations.UnstableApi; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import java.io.File; import java.io.IOException; /** @@ -44,7 +43,7 @@ public interface InputEntityReader */ ObjectWriter DEFAULT_JSON_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter(); - CloseableIterator read(InputEntity source, File temporaryDirectory) throws IOException; + CloseableIterator read() throws IOException; - CloseableIterator sample(InputEntity source, File temporaryDirectory) throws IOException; + CloseableIterator sample() throws IOException; } diff --git a/core/src/main/java/org/apache/druid/data/input/InputFormat.java b/core/src/main/java/org/apache/druid/data/input/InputFormat.java index 8ac986ecb99..682a7bdaa95 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -29,6 +29,8 @@ import org.apache.druid.data.input.impl.NestedInputFormat; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.guice.annotations.UnstableApi; +import java.io.File; + /** * InputFormat abstracts the file format of input data. * It creates a {@link InputEntityReader} to read data and parse it into {@link InputRow}. @@ -53,5 +55,5 @@ public interface InputFormat @JsonIgnore boolean isSplittable(); - InputEntityReader createReader(InputRowSchema inputRowSchema); + InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory); } diff --git a/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java b/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java index 41a7df47dd0..313498198eb 100644 --- a/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java +++ b/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java @@ -23,7 +23,6 @@ import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; -import java.io.File; import java.io.IOException; import java.util.List; @@ -37,9 +36,9 @@ import java.util.List; public abstract class IntermediateRowParsingReader implements InputEntityReader { @Override - public CloseableIterator read(InputEntity source, File temporaryDirectory) throws IOException + public CloseableIterator read() throws IOException { - return intermediateRowIterator(source, temporaryDirectory).flatMap(row -> { + return intermediateRowIterator().flatMap(row -> { try { // since parseInputRows() returns a list, the below line always iterates over the list, // which means it calls Iterator.hasNext() and Iterator.next() at least once per row. @@ -56,10 +55,10 @@ public abstract class IntermediateRowParsingReader implements InputEntityRead } @Override - public CloseableIterator sample(InputEntity source, File temporaryDirectory) + public CloseableIterator sample() throws IOException { - return intermediateRowIterator(source, temporaryDirectory).map(row -> { + return intermediateRowIterator().map(row -> { final String json; try { json = toJson(row); @@ -83,7 +82,7 @@ public abstract class IntermediateRowParsingReader implements InputEntityRead * Creates an iterator of intermediate rows. The returned rows will be consumed by {@link #parseInputRows} and * {@link #toJson}. */ - protected abstract CloseableIterator intermediateRowIterator(InputEntity source, File temporaryDirectory) + protected abstract CloseableIterator intermediateRowIterator() throws IOException; /** diff --git a/core/src/main/java/org/apache/druid/data/input/TextReader.java b/core/src/main/java/org/apache/druid/data/input/TextReader.java index beb748e978c..33151bd0316 100644 --- a/core/src/main/java/org/apache/druid/data/input/TextReader.java +++ b/core/src/main/java/org/apache/druid/data/input/TextReader.java @@ -38,10 +38,14 @@ import java.util.List; public abstract class TextReader extends IntermediateRowParsingReader { private final InputRowSchema inputRowSchema; + final InputEntity source; + final File temporaryDirectory; - public TextReader(InputRowSchema inputRowSchema) + public TextReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { this.inputRowSchema = inputRowSchema; + this.source = source; + this.temporaryDirectory = temporaryDirectory; } public InputRowSchema getInputRowSchema() @@ -50,7 +54,7 @@ public abstract class TextReader extends IntermediateRowParsingReader } @Override - public CloseableIterator intermediateRowIterator(InputEntity source, File temporaryDirectory) + public CloseableIterator intermediateRowIterator() throws IOException { final LineIterator delegate = new LineIterator( diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index 79c2c6b657d..c05f47f594e 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; @@ -31,6 +32,7 @@ import org.apache.druid.indexer.Checks; import org.apache.druid.indexer.Property; import javax.annotation.Nullable; +import java.io.File; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -117,10 +119,12 @@ public class CsvInputFormat implements InputFormat } @Override - public InputEntityReader createReader(InputRowSchema inputRowSchema) + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { return new CsvReader( inputRowSchema, + source, + temporaryDirectory, listDelimiter, columns, findColumnsFromHeader, diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java index f21afe15384..2fb3d8f1794 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import com.opencsv.RFC4180Parser; +import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.TextReader; @@ -34,6 +35,7 @@ import org.apache.druid.java.util.common.parsers.ParserUtils; import org.apache.druid.java.util.common.parsers.Parsers; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -51,13 +53,15 @@ public class CsvReader extends TextReader CsvReader( InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory, @Nullable String listDelimiter, @Nullable List columns, boolean findColumnsFromHeader, int skipHeaderRows ) { - super(inputRowSchema); + super(inputRowSchema, source, temporaryDirectory); this.findColumnsFromHeader = findColumnsFromHeader; this.skipHeaderRows = skipHeaderRows; final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; diff --git a/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java b/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java index b726ec78af3..385bc5f1459 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java @@ -64,9 +64,9 @@ public class InputEntityIteratingReader implements InputSourceReader { return createIterator(entity -> { // InputEntityReader is stateful and so a new one should be created per entity. - final InputEntityReader reader = inputFormat.createReader(inputRowSchema); + final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory); try { - return reader.read(entity, temporaryDirectory); + return reader.read(); } catch (IOException e) { throw new RuntimeException(e); @@ -79,9 +79,9 @@ public class InputEntityIteratingReader implements InputSourceReader { return createIterator(entity -> { // InputEntityReader is stateful and so a new one should be created per entity. - final InputEntityReader reader = inputFormat.createReader(inputRowSchema); + final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory); try { - return reader.sample(entity, temporaryDirectory); + return reader.sample(); } catch (IOException e) { throw new RuntimeException(e); diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index e335d9ba671..0cf8a9fef09 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -23,11 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonParser.Feature; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import javax.annotation.Nullable; +import java.io.File; import java.util.Collections; import java.util.Map; import java.util.Map.Entry; @@ -66,9 +68,9 @@ public class JsonInputFormat extends NestedInputFormat } @Override - public InputEntityReader createReader(InputRowSchema inputRowSchema) + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return new JsonReader(inputRowSchema, getFlattenSpec(), objectMapper); + return new JsonReader(inputRowSchema, source, temporaryDirectory, getFlattenSpec(), objectMapper); } @Override diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java index 61526efe691..ac2b38c2977 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java @@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.TextReader; @@ -30,6 +31,7 @@ import org.apache.druid.java.util.common.parsers.ObjectFlattener; import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.druid.java.util.common.parsers.ParseException; +import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -40,9 +42,15 @@ public class JsonReader extends TextReader private final ObjectFlattener flattener; private final ObjectMapper mapper; - JsonReader(InputRowSchema inputRowSchema, JSONPathSpec flattenSpec, ObjectMapper mapper) + JsonReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory, + JSONPathSpec flattenSpec, + ObjectMapper mapper + ) { - super(inputRowSchema); + super(inputRowSchema, source, temporaryDirectory); this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker()); this.mapper = mapper; } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java index c988b120026..653fcc4c114 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java @@ -120,9 +120,9 @@ public class CsvReaderTest ) ); final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), "|", true, 0); - final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA); + final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); int numResults = 0; - try (CloseableIterator iterator = reader.read(source, null)) { + try (CloseableIterator iterator = reader.read()) { while (iterator.hasNext()) { final InputRow row = iterator.next(); Assert.assertEquals( @@ -216,10 +216,12 @@ public class CsvReaderTest new TimestampSpec("Timestamp", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("Timestamp"))), Collections.emptyList() - ) + ), + source, + null ); - try (CloseableIterator iterator = reader.read(source, null)) { + try (CloseableIterator iterator = reader.read()) { final Iterator expectedRowIterator = expectedResults.iterator(); while (iterator.hasNext()) { Assert.assertTrue(expectedRowIterator.hasNext()); @@ -237,8 +239,8 @@ public class CsvReaderTest ) ); final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, false, 0); - final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA); - try (CloseableIterator iterator = reader.read(source, null)) { + final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); + try (CloseableIterator iterator = reader.read()) { Assert.assertTrue(iterator.hasNext()); final InputRow row = iterator.next(); Assert.assertEquals(DateTimes.of("2019-01-01T00:00:10Z"), row.getTimestamp()); @@ -267,9 +269,9 @@ public class CsvReaderTest private void assertResult(ByteEntity source, CsvInputFormat format) throws IOException { - final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA); + final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); int numResults = 0; - try (CloseableIterator iterator = reader.read(source, null)) { + try (CloseableIterator iterator = reader.read()) { while (iterator.hasNext()) { final InputRow row = iterator.next(); Assert.assertEquals( diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java index f91fe2d37ba..b0b773f427c 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java @@ -65,10 +65,12 @@ public class JsonReaderTest new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), Collections.emptyList() - ) + ), + source, + null ); final int numExpectedIterations = 1; - try (CloseableIterator iterator = reader.read(source, null)) { + try (CloseableIterator iterator = reader.read()) { int numActualIterations = 0; while (iterator.hasNext()) { final InputRow row = iterator.next(); @@ -112,11 +114,13 @@ public class JsonReaderTest new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))), Collections.emptyList() - ) + ), + source, + null ); final int numExpectedIterations = 1; - try (CloseableIterator iterator = reader.read(source, null)) { + try (CloseableIterator iterator = reader.read()) { int numActualIterations = 0; while (iterator.hasNext()) { final InputRow row = iterator.next(); diff --git a/core/src/test/java/org/apache/druid/data/input/impl/NoopInputFormat.java b/core/src/test/java/org/apache/druid/data/input/impl/NoopInputFormat.java index 8520671645b..a640bf1fb76 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/NoopInputFormat.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/NoopInputFormat.java @@ -19,10 +19,13 @@ package org.apache.druid.data.input.impl; +import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; +import java.io.File; + public class NoopInputFormat implements InputFormat { @Override @@ -32,7 +35,7 @@ public class NoopInputFormat implements InputFormat } @Override - public InputEntityReader createReader(InputRowSchema inputRowSchema) + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { throw new UnsupportedOperationException(); }