From d3d0c1c91ec228209ff9aa45155998937aa8e8d6 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 18 Jan 2024 03:18:46 -0800 Subject: [PATCH] Faster parsing: reduce String usage, list-based input rows. (#15681) * Faster parsing: reduce String usage, list-based input rows. Three changes: 1) Reworked FastLineIterator to optionally avoid generating Strings entirely, and reduce copying somewhat. Benefits the line-oriented JSON, CSV, delimited (TSV), and regex formats. 2) In the delimited (TSV) format, when the delimiter is a single byte, split on UTF-8 bytes directly. 3) In CSV and delimited (TSV) formats, use list-based input rows when the column list is provided upfront by the user. * Fix style. * Fix inspections. * Restore validation. * Remove fastutil-extra. * Exception type. * Fixes for error messages. * Fixes for null handling. --- .../DelimitedInputFormatBenchmark.java | 167 ++++++++++++ .../indexing/CountableInputSourceReader.java | 7 + .../msq/input/external/ExternalSegment.java | 3 +- .../sampler/InputSourceSamplerTest.java | 4 +- pom.xml | 5 - processing/pom.xml | 4 - .../apache/druid/data/input/InputFormat.java | 13 + .../druid/data/input/InputSourceReader.java | 10 + .../input/IntermediateRowParsingReader.java | 30 ++- .../druid/data/input/ListBasedInputRow.java | 208 +++++++++++++++ .../data/input/ListBasedInputRowAdapter.java | 60 +++++ .../druid/data/input/MapBasedInputRow.java | 2 +- .../apache/druid/data/input/TextReader.java | 73 ++++-- .../druid/data/input/impl/CsvInputFormat.java | 4 +- .../druid/data/input/impl/DelimitedBytes.java | 67 +++++ .../data/input/impl/DelimitedInputFormat.java | 26 +- .../data/input/impl/DelimitedValueReader.java | 96 +++++-- .../data/input/impl/FastLineIterator.java | 239 +++++++++++++----- .../data/input/impl/FlatTextInputFormat.java | 65 +++-- .../impl/InputEntityIteratingReader.java | 7 + .../druid/data/input/impl/JsonLineReader.java | 8 +- .../data/input/impl/MapInputRowParser.java | 54 +++- .../druid/data/input/impl/RegexReader.java | 2 +- .../impl/TimedShutoffInputSourceReader.java | 7 + .../druid/java/util/common/StringUtils.java | 6 - .../parsers/AbstractFlatTextFormatParser.java | 2 +- .../segment/incremental/IncrementalIndex.java | 9 +- .../druid/data/input/impl/CsvReaderTest.java | 52 ++-- .../data/input/impl/DelimitedBytesTest.java | 88 +++++++ .../data/input/impl/FastLineIteratorTest.java | 70 ++--- 30 files changed, 1178 insertions(+), 210 deletions(-) create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/DelimitedInputFormatBenchmark.java create mode 100644 processing/src/main/java/org/apache/druid/data/input/ListBasedInputRow.java create mode 100644 processing/src/main/java/org/apache/druid/data/input/ListBasedInputRowAdapter.java create mode 100644 processing/src/main/java/org/apache/druid/data/input/impl/DelimitedBytes.java create mode 100644 processing/src/test/java/org/apache/druid/data/input/impl/DelimitedBytesTest.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/DelimitedInputFormatBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/DelimitedInputFormatBenchmark.java new file mode 100644 index 00000000000..cabd58a4b7e --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/DelimitedInputFormatBenchmark.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DelimitedInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@Fork(value = 1) +public class DelimitedInputFormatBenchmark +{ + private static final int NUM_EVENTS = 1200; + + private static final List COLUMNS = + ImmutableList.of( + "timestamp", + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city", + "added", + "deleted", + "delta" + ); + + static { + NullHandling.initializeForTests(); + } + + @Param({"false", "true"}) + private boolean fromHeader; + + InputEntityReader reader; + DelimitedInputFormat format; + byte[] data; + + @Setup(Level.Invocation) + public void prepareReader() + { + ByteEntity source = new ByteEntity(data); + reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + DimensionsSpec.builder().setIncludeAllDimensions(true).build(), + ColumnsFilter.all() + ), + source, + null + ); + } + + @Setup(Level.Trial) + public void prepareData() throws Exception + { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final String headerString = + "timestamp\tpage\tlanguage\tuser\tunpatrolled\tnewPage\trobot\tanonymous\tnamespace\tcontinent\tcountry\tregion\tcity\tadded\tdeleted\tdelta\n"; + final String dataString = + "2013-08-31T01:02:33Z\tGypsy Danger\ten\tnuclear\ttrue\ttrue\tfalse\tfalse\tarticle\tNorth America\tUnited States\tBay Area\tSan Francisco\t57\t200\t-143\n" + + "2013-08-31T03:32:45Z\tStriker Eureka\ten\tspeed\tfalse\ttrue\ttrue\tfalse\twikipedia\tAustralia\tAustralia\tCantebury\tSyndey\t459\t129\t330\n" + + "2013-08-31T07:11:21Z\tCherno Alpha\tru\tmasterYi\tfalse\ttrue\ttrue\tfalse\tarticle\tAsia\tRussia\tOblast\tMoscow\t123\t12\t111\n"; + + baos.write(StringUtils.toUtf8(headerString)); + + for (int i = 0; i < NUM_EVENTS / 3; i++) { + baos.write(StringUtils.toUtf8(dataString)); + } + + data = baos.toByteArray(); + } + + @Setup(Level.Trial) + public void prepareFormat() + { + format = new DelimitedInputFormat(fromHeader ? null : COLUMNS, null, "\t", null, fromHeader, fromHeader ? 0 : 1); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void baseline(final Blackhole blackhole) throws IOException + { + int counted = 0; + try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { + final InputRow row = iterator.next(); + if (row != null) { + counted += 1; + } + blackhole.consume(row); + } + } + + if (counted != NUM_EVENTS) { + throw new RuntimeException("invalid number of loops, counted = " + counted); + } + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(DelimitedInputFormatBenchmark.class.getSimpleName()) + .build(); + + new Runner(opt).run(); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountableInputSourceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountableInputSourceReader.java index fce2de14b5b..e98873c89c7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountableInputSourceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/CountableInputSourceReader.java @@ -25,6 +25,7 @@ import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputStats; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.segment.RowAdapter; import java.io.IOException; @@ -56,4 +57,10 @@ public class CountableInputSourceReader implements InputSourceReader { return inputSourceReader.sample(); } + + @Override + public RowAdapter rowAdapter() + { + return inputSourceReader.rowAdapter(); + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalSegment.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalSegment.java index 93f24cbdff6..4b69a930b1a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalSegment.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalSegment.java @@ -29,7 +29,6 @@ import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.counters.WarningCounters; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; -import org.apache.druid.segment.RowAdapters; import org.apache.druid.segment.RowBasedSegment; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.timeline.SegmentId; @@ -142,7 +141,7 @@ public class ExternalSegment extends RowBasedSegment } } ), - RowAdapters.standardRow(), + reader.rowAdapter(), signature ); this.inputSource = inputSource; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index 5f6375e6297..087b12cef40 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -1165,8 +1165,8 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest // the last row has parse exception when indexing, check if rawColumns and exception message match the expected // String indexParseExceptioMessage = ParserType.STR_CSV.equals(parserType) - ? "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, dim2=null, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]" - : "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]"; + ? "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row={timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, dim2=null, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]" + : "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row={timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]"; assertEqualsSamplerResponseRow( new SamplerResponseRow( rawColumns4ParseExceptionRow, diff --git a/pom.xml b/pom.xml index 6149c5866db..3c5c08ae202 100644 --- a/pom.xml +++ b/pom.xml @@ -910,11 +910,6 @@ fastutil-core ${fastutil.version} - - it.unimi.dsi - fastutil-extra - ${fastutil.version} - com.opencsv opencsv diff --git a/processing/pom.xml b/processing/pom.xml index 7f4a6d7fffd..e81e7058e47 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -233,10 +233,6 @@ it.unimi.dsi fastutil-core - - it.unimi.dsi - fastutil-extra - org.antlr antlr4-runtime diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java index 8abb213b74c..ebf2dea6687 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -30,6 +30,8 @@ import org.apache.druid.data.input.impl.NestedInputFormat; import org.apache.druid.data.input.impl.RegexInputFormat; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.guice.annotations.UnstableApi; +import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.RowAdapters; import org.apache.druid.utils.CompressionUtils; import java.io.File; @@ -85,4 +87,15 @@ public interface InputFormat { return size; } + + /** + * Returns an adapter that can read the rows from {@link #createReader(InputRowSchema, InputEntity, File)}, + * given a particular {@link InputRowSchema}. Note that {@link RowAdapters#standardRow()} always works, but the + * one returned by this method may be more performant. + */ + @SuppressWarnings("unused") // inputRowSchema is currently unused, but may be used in the future for ColumnsFilter + default RowAdapter createRowAdapter(InputRowSchema inputRowSchema) + { + return RowAdapters.standardRow(); + } } diff --git a/processing/src/main/java/org/apache/druid/data/input/InputSourceReader.java b/processing/src/main/java/org/apache/druid/data/input/InputSourceReader.java index 23cbcf32fd8..0b7fcd796f0 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputSourceReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputSourceReader.java @@ -22,6 +22,8 @@ package org.apache.druid.data.input; import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.guice.annotations.UnstableApi; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.RowAdapters; import java.io.IOException; @@ -45,4 +47,12 @@ public interface InputSourceReader CloseableIterator read(InputStats inputStats) throws IOException; CloseableIterator sample() throws IOException; + + /** + * Returns an adapter that can be used to read the rows from {@link #read()}. + */ + default RowAdapter rowAdapter() + { + return RowAdapters.standardRow(); + } } diff --git a/processing/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java b/processing/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java index de0b58a12f3..6c98f823311 100644 --- a/processing/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java @@ -73,11 +73,12 @@ public abstract class IntermediateRowParsingReader implements InputEntityRead } catch (IOException e) { final Map metadata = intermediateRowIteratorWithMetadata.currentMetadata(); + final String rowAsString = intermediateRowAsString(row); rows = new ExceptionThrowingIterator(new ParseException( - String.valueOf(row), + rowAsString, e, buildParseExceptionMessage( - StringUtils.format("Unable to parse row [%s]", row), + StringUtils.format("Unable to parse row [%s]", rowAsString), source(), currentRecordNumber, metadata @@ -166,10 +167,11 @@ public abstract class IntermediateRowParsingReader implements InputEntityRead rawColumnsList = toMap(row); } catch (Exception e) { + final String rowAsString = intermediateRowAsString(row); return InputRowListPlusRawValues.of( null, - new ParseException(String.valueOf(row), e, buildParseExceptionMessage( - StringUtils.nonStrictFormat("Unable to parse row [%s] into JSON", row), + new ParseException(rowAsString, e, buildParseExceptionMessage( + StringUtils.nonStrictFormat("Unable to parse row [%s] into JSON", rowAsString), source(), null, metadata @@ -178,10 +180,11 @@ public abstract class IntermediateRowParsingReader implements InputEntityRead } if (CollectionUtils.isNullOrEmpty(rawColumnsList)) { + final String rowAsString = intermediateRowAsString(row); return InputRowListPlusRawValues.of( null, - new ParseException(String.valueOf(row), buildParseExceptionMessage( - StringUtils.nonStrictFormat("No map object parsed for row [%s]", row), + new ParseException(rowAsString, buildParseExceptionMessage( + StringUtils.nonStrictFormat("No map object parsed for row [%s]", rowAsString), source(), null, metadata @@ -195,14 +198,15 @@ public abstract class IntermediateRowParsingReader implements InputEntityRead } catch (ParseException e) { return InputRowListPlusRawValues.ofList(rawColumnsList, new ParseException( - String.valueOf(row), + intermediateRowAsString(row), e, buildParseExceptionMessage(e.getMessage(), source(), null, metadata) )); } catch (IOException e) { - ParseException exception = new ParseException(String.valueOf(row), e, buildParseExceptionMessage( - StringUtils.nonStrictFormat("Unable to parse row [%s] into inputRow", row), + final String rowAsString = intermediateRowAsString(row); + ParseException exception = new ParseException(rowAsString, e, buildParseExceptionMessage( + StringUtils.nonStrictFormat("Unable to parse row [%s] into inputRow", rowAsString), source(), null, metadata @@ -231,6 +235,14 @@ public abstract class IntermediateRowParsingReader implements InputEntityRead return CloseableIteratorWithMetadata.withEmptyMetadata(intermediateRowIterator()); } + /** + * String representation of an intermediate row. Used for error messages. + */ + protected String intermediateRowAsString(@Nullable T row) + { + return String.valueOf(row); + } + /** * @return InputEntity which the implementation is reading from. Useful in generating informative {@link ParseException}s. * For example, in case of {@link org.apache.druid.data.input.impl.FileEntity}, file name containing erroneous records diff --git a/processing/src/main/java/org/apache/druid/data/input/ListBasedInputRow.java b/processing/src/main/java/org/apache/druid/data/input/ListBasedInputRow.java new file mode 100644 index 00000000000..21c6cb104d1 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/ListBasedInputRow.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.collect.Utils; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.segment.column.RowSignature; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Input row backed by a {@link List}. This implementation is most useful when the columns and dimensions are known + * ahead of time, prior to encountering any data. It is used in concert with {@link ListBasedInputRowAdapter}, which + * enables + */ +public class ListBasedInputRow implements InputRow +{ + private final RowSignature signature; + private final DateTime timestamp; + private final List dimensions; + private final List data; + + /** + * Create a row. + * + * @param signature signature; must match the "data" list + * @param timestamp row timestamp + * @param dimensions dimensions to be reported by {@link #getDimensions()} + * @param data row data + */ + public ListBasedInputRow( + final RowSignature signature, + final DateTime timestamp, + final List dimensions, + final List data + ) + { + this.signature = signature; + this.timestamp = timestamp; + this.dimensions = dimensions; + this.data = data; + } + + /** + * Create a row and parse a timestamp. Throws {@link ParseException} if the timestamp cannot be parsed. + * + * @param signature signature; must match the "data" list + * @param timestampSpec timestamp spec + * @param dimensions dimensions to be reported by {@link #getDimensions()} + * @param data row data + */ + public static InputRow parse( + RowSignature signature, + TimestampSpec timestampSpec, + List dimensions, + List data + ) + { + final DateTime timestamp = parseTimestamp(timestampSpec, data, signature); + return new ListBasedInputRow(signature, timestamp, dimensions, data); + } + + @Override + public List getDimensions() + { + return dimensions; + } + + @Override + public long getTimestampFromEpoch() + { + return timestamp.getMillis(); + } + + @Override + public DateTime getTimestamp() + { + return timestamp; + } + + @Override + public List getDimension(String dimension) + { + return Rows.objectToStrings(getRaw(dimension)); + } + + @Nullable + @Override + public Object getRaw(String columnName) + { + final int i = signature.indexOf(columnName); + + if (i < 0 || i >= data.size()) { + return null; + } else { + return data.get(i); + } + } + + @Nullable + public Object getRaw(int columnNumber) + { + if (columnNumber < data.size()) { + return data.get(columnNumber); + } else { + // Row may have ended early, which is OK. + return null; + } + } + + @Nullable + @Override + public Number getMetric(String metric) + { + return Rows.objectToNumber(metric, getRaw(metric), true); + } + + @Override + public int compareTo(Row o) + { + return timestamp.compareTo(o.getTimestamp()); + } + + public Map asMap() + { + return Utils.zipMapPartial(signature.getColumnNames(), data); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ListBasedInputRow that = (ListBasedInputRow) o; + return Objects.equals(dimensions, that.dimensions) + && Objects.equals(signature, that.signature) + && Objects.equals(data, that.data); + } + + @Override + public int hashCode() + { + return Objects.hash(dimensions, signature, data); + } + + @Override + public String toString() + { + return "{" + + "timestamp=" + DateTimes.utc(getTimestampFromEpoch()) + + ", event=" + asMap() + + ", dimensions=" + dimensions + + '}'; + } + + /** + * Helper for {@link #parse(RowSignature, TimestampSpec, List, List)}. + */ + private static DateTime parseTimestamp( + final TimestampSpec timestampSpec, + final List theList, + final RowSignature signature + ) + { + final int timeColumnIndex = signature.indexOf(timestampSpec.getTimestampColumn()); + final Object timeValue; + + if (theList != null && timeColumnIndex >= 0 && timeColumnIndex < theList.size()) { + timeValue = theList.get(timeColumnIndex); + } else { + timeValue = null; + } + + return MapInputRowParser.parseTimestampOrThrowParseException( + timeValue, + timestampSpec, + () -> Utils.zipMapPartial(signature.getColumnNames(), theList) + ); + } +} diff --git a/processing/src/main/java/org/apache/druid/data/input/ListBasedInputRowAdapter.java b/processing/src/main/java/org/apache/druid/data/input/ListBasedInputRowAdapter.java new file mode 100644 index 00000000000..980949782af --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/ListBasedInputRowAdapter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.RowAdapters; +import org.apache.druid.segment.column.RowSignature; + +import java.util.function.Function; +import java.util.function.ToLongFunction; + +/** + * Adapter for reading {@link ListBasedInputRow}. The {@link RowAdapters#standardRow()} would also work, but this + * one is faster because it avoids per-row field-name-to-index lookups. Must be instantiated with the same + * {@link RowSignature} as the {@link ListBasedInputRow} that are to be parsed. + */ +public class ListBasedInputRowAdapter implements RowAdapter +{ + private final RowSignature fields; + + public ListBasedInputRowAdapter(final RowSignature fields) + { + this.fields = fields; + } + + @Override + public ToLongFunction timestampFunction() + { + return Row::getTimestampFromEpoch; + } + + @Override + public Function columnFunction(String columnName) + { + final int i = fields.indexOf(columnName); + if (i < 0) { + return row -> null; + } else { + // Get by column number, not name. + return row -> ((ListBasedInputRow) row).getRaw(i); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java b/processing/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java index e9117f26911..e4021c7e0d7 100644 --- a/processing/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java +++ b/processing/src/main/java/org/apache/druid/data/input/MapBasedInputRow.java @@ -86,7 +86,7 @@ public class MapBasedInputRow extends MapBasedRow implements InputRow @Override public String toString() { - return "MapBasedInputRow{" + + return "{" + "timestamp=" + DateTimes.utc(getTimestampFromEpoch()) + ", event=" + getEvent() + ", dimensions=" + dimensions + diff --git a/processing/src/main/java/org/apache/druid/data/input/TextReader.java b/processing/src/main/java/org/apache/druid/data/input/TextReader.java index 45685ceb306..af092442fb1 100644 --- a/processing/src/main/java/org/apache/druid/data/input/TextReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/TextReader.java @@ -19,14 +19,17 @@ package org.apache.druid.data.input; -import com.google.common.base.Strings; import org.apache.druid.data.input.impl.FastLineIterator; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParserUtils; +import org.apache.druid.segment.column.RowSignature; +import javax.annotation.Nullable; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -35,12 +38,12 @@ import java.util.Map; /** * Abstract {@link InputEntityReader} for text format readers such as CSV or JSON. */ -public abstract class TextReader extends IntermediateRowParsingReader +public abstract class TextReader extends IntermediateRowParsingReader { private final InputRowSchema inputRowSchema; private final InputEntity source; - public TextReader(InputRowSchema inputRowSchema, InputEntity source) + protected TextReader(InputRowSchema inputRowSchema, InputEntity source) { this.inputRowSchema = inputRowSchema; this.source = source; @@ -52,9 +55,9 @@ public abstract class TextReader extends IntermediateRowParsingReader } @Override - public CloseableIteratorWithMetadata intermediateRowIteratorWithMetadata() throws IOException + public CloseableIteratorWithMetadata intermediateRowIteratorWithMetadata() throws IOException { - final CloseableIterator delegate = new FastLineIterator(source.open()); + final CloseableIterator delegate = makeSourceIterator(source.open()); final int numHeaderLines = getNumHeaderLinesToSkip(); for (int i = 0; i < numHeaderLines && delegate.hasNext(); i++) { delegate.next(); // skip lines @@ -63,7 +66,7 @@ public abstract class TextReader extends IntermediateRowParsingReader processHeaderLine(delegate.next()); } - return new CloseableIteratorWithMetadata() + return new CloseableIteratorWithMetadata() { private static final String LINE_KEY = "Line"; private long currentLineNumber = numHeaderLines + (needsToProcessHeaderLine() ? 1 : 0); @@ -81,7 +84,7 @@ public abstract class TextReader extends IntermediateRowParsingReader } @Override - public String next() + public T next() { currentLineNumber++; return delegate.next(); @@ -108,7 +111,7 @@ public abstract class TextReader extends IntermediateRowParsingReader * This method will be called after {@link #getNumHeaderLinesToSkip()} and {@link #processHeaderLine}. */ @Override - public abstract List parseInputRows(String intermediateRow) throws IOException, ParseException; + public abstract List parseInputRows(T intermediateRow) throws IOException, ParseException; /** * Returns the number of header lines to skip. @@ -125,23 +128,61 @@ public abstract class TextReader extends IntermediateRowParsingReader /** * Processes a header line. This will be called if {@link #needsToProcessHeaderLine()} = true. */ - public abstract void processHeaderLine(String line) throws IOException; + public abstract void processHeaderLine(T line) throws IOException; - public static List findOrCreateColumnNames(List parsedLine) + protected abstract CloseableIterator makeSourceIterator(InputStream in); + + public static RowSignature findOrCreateInputRowSignature(List parsedLine) { final List columns = new ArrayList<>(parsedLine.size()); for (int i = 0; i < parsedLine.size(); i++) { - if (Strings.isNullOrEmpty(parsedLine.get(i))) { + if (com.google.common.base.Strings.isNullOrEmpty(parsedLine.get(i))) { columns.add(ParserUtils.getDefaultColumnName(i)); } else { columns.add(parsedLine.get(i)); } } - if (columns.isEmpty()) { - return ParserUtils.generateFieldNames(parsedLine.size()); - } else { - ParserUtils.validateFields(columns); - return columns; + + ParserUtils.validateFields(columns); + final RowSignature.Builder builder = RowSignature.builder(); + for (final String column : columns) { + builder.add(column, null); + } + return builder.build(); + } + + public abstract static class Strings extends TextReader + { + protected Strings(InputRowSchema inputRowSchema, InputEntity source) + { + super(inputRowSchema, source); + } + + @Override + protected CloseableIterator makeSourceIterator(InputStream in) + { + return new FastLineIterator.Strings(in); + } + } + + public abstract static class Bytes extends TextReader + { + protected Bytes(InputRowSchema inputRowSchema, InputEntity source) + { + super(inputRowSchema, source); + } + + @Override + protected CloseableIterator makeSourceIterator(InputStream in) + { + return new FastLineIterator.Bytes(in); + } + + @Override + protected String intermediateRowAsString(@Nullable byte[] row) + { + // Like String.valueOf, but for UTF-8 bytes. Keeps error messages consistent between String and Bytes. + return row == null ? "null" : StringUtils.fromUtf8(row); } } } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index 03706d1ef44..a041e031a3b 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -29,6 +29,7 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; import java.io.File; @@ -78,7 +79,8 @@ public class CsvInputFormat extends FlatTextInputFormat getColumns(), isFindColumnsFromHeader(), getSkipHeaderRows(), - line -> Arrays.asList(parser.parseLine(line)) + line -> Arrays.asList(parser.parseLine(StringUtils.fromUtf8(line))), + useListBasedInputRows() ); } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/DelimitedBytes.java b/processing/src/main/java/org/apache/druid/data/input/impl/DelimitedBytes.java new file mode 100644 index 00000000000..0ac45ee7c13 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/impl/DelimitedBytes.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * Utility function for {@link DelimitedInputFormat}. + */ +public class DelimitedBytes +{ + /** + * Parameter for {@link #split(byte[], byte, int)} signifying that we do not know the expected number of fields. + */ + public static final int UNKNOWN_FIELD_COUNT = -1; + + /** + * Split UTF-8 bytes by a particular delimiter. When {@link NullHandling#sqlCompatible()}, empty parts are + * returned as nulls. When {@link NullHandling#replaceWithDefault()}, empty parts are returned as empty strings. + * + * @param bytes utf-8 bytes + * @param delimiter the delimiter + * @param numFieldsHint expected number of fields, or {@link #UNKNOWN_FIELD_COUNT} + */ + public static List split(final byte[] bytes, final byte delimiter, final int numFieldsHint) + { + final List out = numFieldsHint == UNKNOWN_FIELD_COUNT ? new ArrayList<>() : new ArrayList<>(numFieldsHint); + + int start = 0; + int position = 0; + + while (position < bytes.length) { + if (bytes[position] == delimiter) { + final String s = StringUtils.fromUtf8(bytes, start, position - start); + out.add(s.isEmpty() && NullHandling.sqlCompatible() ? null : s); + start = position + 1; + } + + position++; + } + + final String s = StringUtils.fromUtf8(bytes, start, position - start); + out.add(s.isEmpty() && NullHandling.sqlCompatible() ? null : s); + return out; + } +} diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java index d409f7ef12e..238a2f4dc2c 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java @@ -27,6 +27,7 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; import java.io.File; @@ -80,10 +81,33 @@ public class DelimitedInputFormat extends FlatTextInputFormat getColumns(), isFindColumnsFromHeader(), getSkipHeaderRows(), - line -> splitToList(Splitter.on(getDelimiter()), line) + makeDelimitedValueParser( + getDelimiter(), + useListBasedInputRows() ? getColumns().size() : DelimitedBytes.UNKNOWN_FIELD_COUNT + ), + useListBasedInputRows() ); } + /** + * Create a parser for a particular delimiter and expected number of fields. + */ + private static DelimitedValueReader.DelimitedValueParser makeDelimitedValueParser( + final String delimiter, + final int numFields + ) + { + final byte[] utf8Delimiter = StringUtils.toUtf8(delimiter); + if (utf8Delimiter.length == 1) { + // Single-byte delimiter: split bytes directly, prior to UTF-8 conversion + final byte delimiterByte = utf8Delimiter[0]; + return bytes -> DelimitedBytes.split(bytes, delimiterByte, numFields); + } else { + final Splitter splitter = Splitter.on(delimiter); + return bytes -> splitToList(splitter, StringUtils.fromUtf8(bytes)); + } + } + /** * Copied from Guava's {@link Splitter#splitToList(CharSequence)}. * This is to avoid the error of the missing method signature when using an old Guava library. diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java index d6f817d6fa1..227edffbcb0 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java @@ -22,19 +22,24 @@ package org.apache.druid.data.input.impl; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.ListBasedInputRow; import org.apache.druid.data.input.TextReader; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.collect.Utils; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParserUtils; import org.apache.druid.java.util.common.parsers.Parsers; +import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -42,18 +47,31 @@ import java.util.Map; /** * DelimitedValueReader is the reader for Delimitor Separate Value format input data(CSV/TSV). */ -public class DelimitedValueReader extends TextReader +public class DelimitedValueReader extends TextReader.Bytes { private final boolean findColumnsFromHeader; private final int skipHeaderRows; private final Function multiValueFunction; private final DelimitedValueParser parser; + + /** + * Signature of the delimited files. Written by {@link #setSignature(List)}. + */ @Nullable - private List columns; + private RowSignature inputRowSignature; + + /** + * Dimensions list used for generating {@link InputRow}. Derived from {@link #getInputRowSchema()}, but has + * dimensions locked-in based on the {@link #inputRowSignature}, so they do not need to be recalculated on each row. + * Written by {@link #setSignature(List)}. + */ + @Nullable + private List inputRowDimensions; + private final boolean useListBasedInputRows; interface DelimitedValueParser { - List parseLine(String line) throws IOException; + List parseLine(byte[] line) throws IOException; } DelimitedValueReader( @@ -63,7 +81,8 @@ public class DelimitedValueReader extends TextReader @Nullable List columns, boolean findColumnsFromHeader, int skipHeaderRows, - DelimitedValueParser parser + DelimitedValueParser parser, + boolean useListBasedInputRows ) { super(inputRowSchema, source); @@ -71,28 +90,58 @@ public class DelimitedValueReader extends TextReader this.skipHeaderRows = skipHeaderRows; final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); - this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row + + if (!findColumnsFromHeader && columns != null) { + // If findColumnsFromHeader, inputRowSignature will be set later. + setSignature(columns); + } + this.parser = parser; + this.useListBasedInputRows = useListBasedInputRows; } @Override - public List parseInputRows(String line) throws IOException, ParseException + public List parseInputRows(byte[] line) throws IOException, ParseException { - final Map zipped = parseLine(line); - return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped)); + if (useListBasedInputRows) { + final List parsed = readLineAsList(line); + return Collections.singletonList( + ListBasedInputRow.parse( + inputRowSignature, + getInputRowSchema().getTimestampSpec(), + inputRowDimensions, + parsed + ) + ); + } else { + final Map zipped = readLineAsMap(line); + return Collections.singletonList( + MapInputRowParser.parse( + getInputRowSchema().getTimestampSpec(), + inputRowDimensions, + zipped + ) + ); + } } @Override - public List> toMap(String intermediateRow) throws IOException + public List> toMap(byte[] intermediateRow) throws IOException { - return Collections.singletonList(parseLine(intermediateRow)); + return Collections.singletonList(readLineAsMap(intermediateRow)); } - private Map parseLine(String line) throws IOException + private List readLineAsList(byte[] line) throws IOException + { + final List parsed = parser.parseLine(line); + return new ArrayList<>(Lists.transform(parsed, multiValueFunction)); + } + + private Map readLineAsMap(byte[] line) throws IOException { final List parsed = parser.parseLine(line); return Utils.zipMapPartial( - Preconditions.checkNotNull(columns, "columns"), + Preconditions.checkNotNull(inputRowSignature, "inputRowSignature").getColumnNames(), Iterables.transform(parsed, multiValueFunction) ); } @@ -110,14 +159,31 @@ public class DelimitedValueReader extends TextReader } @Override - public void processHeaderLine(String line) throws IOException + public void processHeaderLine(byte[] line) throws IOException { if (!findColumnsFromHeader) { throw new ISE("Don't call this if findColumnsFromHeader = false"); } - columns = findOrCreateColumnNames(parser.parseLine(line)); - if (columns.isEmpty()) { + setSignature(parser.parseLine(line)); + } + + /** + * Set {@link #inputRowDimensions} and {@link #inputRowSignature} based on a set of header columns. Must be called + * prior to {@link #parseInputRows(byte[])}. + * + * @param columns header columns + */ + private void setSignature(final List columns) + { + inputRowSignature = findOrCreateInputRowSignature(columns); + if (inputRowSignature.size() == 0) { throw new ISE("Empty columns"); } + + inputRowDimensions = MapInputRowParser.findDimensions( + getInputRowSchema().getTimestampSpec(), + getInputRowSchema().getDimensionsSpec(), + ImmutableSet.copyOf(inputRowSignature.getColumnNames()) + ); } } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/FastLineIterator.java b/processing/src/main/java/org/apache/druid/data/input/impl/FastLineIterator.java index dd593851f06..9d2afc55bcc 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/FastLineIterator.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/FastLineIterator.java @@ -20,64 +20,77 @@ package org.apache.druid.data.input.impl; import com.google.common.base.Preconditions; -import it.unimi.dsi.fastutil.bytes.ByteArrayList; +import com.google.common.primitives.Longs; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.NoSuchElementException; /** * Like the Apache Commons LineIterator, but faster. + * + * Use {@link Strings} or {@link Bytes} as appropriate. Bytes is faster. */ -public class FastLineIterator implements CloseableIterator +public abstract class FastLineIterator implements CloseableIterator { // visible for tests static final int BUFFER_SIZE = 512; - private static final ThreadLocal BUFFER_LOCAL = ThreadLocal.withInitial(() -> new byte[BUFFER_SIZE]); - private static final byte CR = (byte) '\r'; private static final byte LF = (byte) '\n'; private final InputStream source; - private final ByteArrayList buffer; + private ByteBuffer buffer; + private int lineStart = 0; + private int position = 0; + private int limit = 0; + private boolean endOfStream; + private T nextLine; - private String nextLine; + /** + * {@link #LF} in every byte. + */ + private static final long LF_REPEAT = firstOccurrencePattern(LF); /** * Constructor; a local buffer will be created + * * @param source */ - public FastLineIterator(InputStream source) + protected FastLineIterator(InputStream source) { - this(source, new ByteArrayList()); + this(source, new byte[BUFFER_SIZE]); } /** * Constructor; BYO buffer. * * Existing contents of the buffer will be destroyed. + * * @param source * @param buffer a buffer used for between-read calls */ - public FastLineIterator(InputStream source, ByteArrayList buffer) + protected FastLineIterator(InputStream source, byte[] buffer) { Preconditions.checkNotNull(source); Preconditions.checkNotNull(buffer); this.source = source; this.nextLine = null; - this.buffer = buffer; - this.buffer.size(0); + setBuffer(buffer); } @Override public void close() throws IOException { nextLine = null; + buffer = null; source.close(); - // Note: do not remove the thread local buffer; retain it for reuse later } @Override @@ -88,76 +101,188 @@ public class FastLineIterator implements CloseableIterator return true; } - readNextLine(); + nextLine = readNextLine(); return nextLine != null; } @Override - public String next() + public T next() { if (!hasNext()) { throw new NoSuchElementException("no more lines"); } - String result = nextLine; + T result = nextLine; nextLine = null; return result; } - void readNextLine() + private T readNextLine() { - byte[] load = BUFFER_LOCAL.get(); - - boolean endOfFile = false; - - // load data until finished or found a line feed - int indexOfLf = buffer.indexOf(LF); - while (!endOfFile && indexOfLf < 0) { - int readCount; - - try { - readCount = source.read(load); - } - catch (IOException e) { - nextLine = null; - throw new IllegalStateException(e); - } - - if (readCount < 0) { - endOfFile = true; + while (true) { + // See if there's another line already available in the buffer. + if (endOfStream) { + return readLineFromBuffer(); } else { - int sizeBefore = buffer.size(); - buffer.addElements(buffer.size(), load, 0, readCount); + // Look for next LF with 8-byte stride. + while (position < limit - Long.BYTES) { + final long w = buffer.getLong(position); + final int index = firstOccurrence(w, LF_REPEAT); + if (index < Long.BYTES) { + position = position + index; + return readLineFromBuffer(); + } else { + position += Long.BYTES; + } + } - // check if there were any LFs in the newly collected data - for (int i = 0; i < readCount; i++) { - if (load[i] == LF) { - indexOfLf = sizeBefore + i; - break; + // Look for next LF with 1-byte stride. + for (; position < limit; position++) { + if (buffer.get(position) == LF) { + return readLineFromBuffer(); } } } + + // No line available in the buffer. + // Ensure space exists to read at least one more byte. + final int available = buffer.capacity() - limit; + + if (available == 0) { + final int currentLength = limit - lineStart; + + if (lineStart == 0) { + // Allocate a larger buffer. + final byte[] newBuf = new byte[buffer.capacity() * 2]; + System.arraycopy(buffer.array(), lineStart, newBuf, 0, currentLength); + setBuffer(newBuf); + } else { + // Move current line to the start of the existing buffer. + System.arraycopy(buffer.array(), lineStart, buffer.array(), 0, currentLength); + } + + position -= lineStart; + limit -= lineStart; + lineStart = 0; + } + + // Read as much as we can. + try { + final int bytesRead = source.read(buffer.array(), limit, buffer.capacity() - limit); + if (bytesRead < 0) { + // End of stream. + endOfStream = true; + return readLineFromBuffer(); + } + + limit += bytesRead; + } + catch (IOException e) { + throw new IllegalStateException(e); + } + } + } + + @Nullable + private T readLineFromBuffer() + { + for (; position < limit && buffer.get(position) != LF; position++) { + // Skip to limit or next LF } - if (endOfFile && buffer.size() == 0) { - // empty line and end of file - nextLine = null; + final boolean isLf = position < limit && buffer.get(position) == LF; - } else if (indexOfLf < 0) { - // no LF at all; end of input - nextLine = StringUtils.fromUtf8(buffer); - buffer.removeElements(0, buffer.size()); - - } else if (indexOfLf >= 1 && buffer.getByte(indexOfLf - 1) == CR) { + final int lineEnd; + if (isLf && position > lineStart && buffer.get(position - 1) == CR) { // CR LF - nextLine = StringUtils.fromUtf8(buffer.elements(), 0, indexOfLf - 1); - buffer.removeElements(0, indexOfLf + 1); - - } else { + lineEnd = position - 1; + } else if (isLf) { // LF only - nextLine = StringUtils.fromUtf8(buffer.elements(), 0, indexOfLf); - buffer.removeElements(0, indexOfLf + 1); + lineEnd = position; + } else if (endOfStream) { + // End of stream + lineEnd = position; + } else { + // There wasn't a line after all + throw DruidException.defensive("No line to read"); + } + + if (lineStart == limit && endOfStream) { + // null signifies no more lines are coming. + return null; + } + + final T retVal = makeObject(buffer.array(), lineStart, lineEnd - lineStart); + if (position < limit) { + position++; + } + lineStart = position; + return retVal; + } + + protected abstract T makeObject(byte[] bytes, int offset, int length); + + private void setBuffer(final byte[] buffer) + { + this.buffer = ByteBuffer.wrap(buffer).order(ByteOrder.BIG_ENDIAN); + } + + /** + * Find the first {@link #LF} byte in a long. Returns 8 if there are no {@link #LF} bytes. + * + * @param n input long + * @param pattern pattern generated by repeating the byte 8 times. Use + * {@link #firstOccurrencePattern(byte)} to compute. + */ + private static int firstOccurrence(long n, long pattern) + { + // Xor with LF_REPEAT to turn LF bytes into zero-bytes. + final long xored = n ^ pattern; + + // Apply test from https://graphics.stanford.edu/~seander/bithacks.html#ValueInWord, which zeroes out all + // non-zero bytes, and sets the high bits for bytes that were zero. + final long zeroTest = (((xored - 0x0101010101010101L) & ~(xored) & 0x8080808080808080L)); + + // Count number of leading zeroes, which will be a multiple of 8, then divide by 8. + return Long.numberOfLeadingZeros(zeroTest) >>> 3; + } + + /** + * Generate a search pattern for {@link #firstOccurrence(long, long)}. + */ + private static long firstOccurrencePattern(final byte b) + { + return Longs.fromBytes(b, b, b, b, b, b, b, b); + } + + public static class Strings extends FastLineIterator + { + public Strings(final InputStream source) + { + super(source); + } + + @Override + protected String makeObject(byte[] bytes, int offset, int length) + { + return StringUtils.fromUtf8(bytes, offset, length); + } + } + + public static class Bytes extends FastLineIterator + { + public Bytes(final InputStream source) + { + super(source); + } + + @Override + protected byte[] makeObject(byte[] bytes, int offset, int length) + { + final byte[] retVal = new byte[length]; + System.arraycopy(bytes, offset, retVal, 0, length); + return retVal; } } } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java index 000ce65dae5..8e8e052cd00 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java @@ -23,7 +23,15 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.ListBasedInputRow; +import org.apache.druid.data.input.ListBasedInputRowAdapter; +import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.RowAdapters; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.utils.CompressionUtils; import javax.annotation.Nullable; @@ -123,6 +131,30 @@ public abstract class FlatTextInputFormat implements InputFormat return skipHeaderRows; } + @Override + public long getWeightedSize(String path, long size) + { + CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path); + if (CompressionUtils.Format.GZ == compressionFormat) { + return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR; + } + return size; + } + + @Override + public RowAdapter createRowAdapter(InputRowSchema inputRowSchema) + { + if (useListBasedInputRows()) { + final RowSignature.Builder builder = RowSignature.builder(); + for (final String column : columns) { + builder.add(column, null); + } + return new ListBasedInputRowAdapter(builder.build()); + } else { + return RowAdapters.standardRow(); + } + } + @Override public boolean equals(Object o) { @@ -140,31 +172,30 @@ public abstract class FlatTextInputFormat implements InputFormat Objects.equals(delimiter, that.delimiter); } - @Override - public long getWeightedSize(String path, long size) - { - CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path); - if (CompressionUtils.Format.GZ == compressionFormat) { - return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR; - } - return size; - } - @Override public int hashCode() { return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows, delimiter); } + /** + * Whether this format can use {@link ListBasedInputRow}, which are preferred over {@link MapBasedInputRow} when + * possible. Subclasses pass this to {@link DelimitedValueReader}. + */ + protected boolean useListBasedInputRows() + { + return !columns.isEmpty() && !findColumnsFromHeader; + } + protected String fieldsToString() { return "FlatTextInputFormat{" - + "delimiter=\"" + delimiter - + "\"listDelimiter=" - + listDelimiter == null ? "null" : "\"" + listDelimiter + "\"" - + ", findColumnsFromHeader=" + findColumnsFromHeader - + ", skipHeaderRows=" + skipHeaderRows - + ", columns=" + columns - + "}"; + + "delimiter=\"" + delimiter + + "\"listDelimiter=" + + (listDelimiter == null ? "null" : "\"" + listDelimiter + "\"") + + ", findColumnsFromHeader=" + findColumnsFromHeader + + ", skipHeaderRows=" + skipHeaderRows + + ", columns=" + columns + + "}"; } } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java index a0479c181e9..cf88d273dd1 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java @@ -30,6 +30,7 @@ import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.segment.RowAdapter; import java.io.File; import java.io.IOException; @@ -105,6 +106,12 @@ public class InputEntityIteratingReader implements InputSourceReader }); } + @Override + public RowAdapter rowAdapter() + { + return inputFormat.createRowAdapter(inputRowSchema); + } + private CloseableIterator createIterator(Function> rowPopulator) { return sourceIterator.flatMap(rowPopulator); diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java index 02d16ffc9f5..d12f38ab97e 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java @@ -45,7 +45,7 @@ import java.util.Map; * This also means that each text line should be a well-formed JSON text, pretty-printed format is not allowed * */ -public class JsonLineReader extends TextReader +public class JsonLineReader extends TextReader.Bytes { private final ObjectFlattener flattener; private final ObjectMapper mapper; @@ -70,7 +70,7 @@ public class JsonLineReader extends TextReader } @Override - public List parseInputRows(String line) throws IOException, ParseException + public List parseInputRows(byte[] line) throws IOException, ParseException { final JsonNode document = mapper.readValue(line, JsonNode.class); final Map flattened = flattener.flatten(document); @@ -78,7 +78,7 @@ public class JsonLineReader extends TextReader } @Override - public List> toMap(String intermediateRow) throws IOException + public List> toMap(byte[] intermediateRow) throws IOException { //noinspection unchecked return Collections.singletonList(mapper.readValue(intermediateRow, Map.class)); @@ -97,7 +97,7 @@ public class JsonLineReader extends TextReader } @Override - public void processHeaderLine(String line) + public void processHeaderLine(byte[] line) { // do nothing } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java b/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java index fcd68741f1a..b73190e1333 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java @@ -37,6 +37,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; public class MapInputRowParser implements InputRowParser> { @@ -80,8 +81,17 @@ public class MapInputRowParser implements InputRowParser> theMap == null ? Collections.emptySet() : theMap.keySet() ); + return parse(timestampSpec, dimensionsToUse, theMap); + } + + public static InputRow parse( + TimestampSpec timestampSpec, + List dimensions, + Map theMap + ) throws ParseException + { final DateTime timestamp = parseTimestamp(timestampSpec, theMap); - return new MapBasedInputRow(timestamp, dimensionsToUse, theMap); + return new MapBasedInputRow(timestamp, dimensions, theMap); } /** @@ -89,11 +99,11 @@ public class MapInputRowParser implements InputRowParser> * There are 3 cases here. * * 1) If {@link DimensionsSpec#isIncludeAllDimensions()} is set, the returned list includes _both_ - * {@link DimensionsSpec#getDimensionNames()} and the dimensions in the given map ({@code rawInputRow#keySet()}). + * {@link DimensionsSpec#getDimensionNames()} and the dimensions in the given map ({@code rawInputRow#keySet()}). * 2) If isIncludeAllDimensions is not set and {@link DimensionsSpec#getDimensionNames()} is not empty, - * the dimensions in dimensionsSpec is returned. + * the dimensions in dimensionsSpec is returned. * 3) If isIncludeAllDimensions is not set and {@link DimensionsSpec#getDimensionNames()} is empty, - * the dimensions in the given map is returned. + * the dimensions in the given map is returned. * * In any case, the returned list does not include any dimensions in {@link DimensionsSpec#getDimensionExclusions()} * or {@link TimestampSpec#getTimestampColumn()}. @@ -132,32 +142,54 @@ public class MapInputRowParser implements InputRowParser> } public static DateTime parseTimestamp(TimestampSpec timestampSpec, Map theMap) + { + return parseTimestampOrThrowParseException( + timestampSpec.getRawTimestamp(theMap), + timestampSpec, + () -> theMap + ); + } + + /** + * Given a plain Java Object, extract a timestamp from it using the provided {@link TimestampSpec}, or throw + * a {@link ParseException} if we can't. + * + * @param timeValue object to interpret as timestmap + * @param timestampSpec timestamp spec + * @param rawMapSupplier supplier of the original raw data that this object came from. Used for error messages. + */ + public static DateTime parseTimestampOrThrowParseException( + final Object timeValue, + final TimestampSpec timestampSpec, + final Supplier> rawMapSupplier + ) { final DateTime timestamp; + try { - timestamp = timestampSpec.extractTimestamp(theMap); + timestamp = timestampSpec.parseDateTime(timeValue); } catch (Exception e) { - String rawMap = rawMapToPrint(theMap); + String rawMap = rawMapToPrint(rawMapSupplier.get()); throw new ParseException( rawMap, e, "Timestamp[%s] is unparseable! Event: %s", - timestampSpec.getRawTimestamp(theMap), + timeValue, rawMap ); } if (timestamp == null) { - String rawMap = rawMapToPrint(theMap); + String rawMap = rawMapToPrint(rawMapSupplier.get()); throw new ParseException( rawMap, "Timestamp[%s] is unparseable! Event: %s", - timestampSpec.getRawTimestamp(theMap), + timeValue, rawMap ); } if (!Intervals.ETERNITY.contains(timestamp)) { - String rawMap = rawMapToPrint(theMap); + String rawMap = rawMapToPrint(rawMapSupplier.get()); throw new ParseException( rawMap, "Encountered row with timestamp[%s] that cannot be represented as a long: [%s]", @@ -169,7 +201,7 @@ public class MapInputRowParser implements InputRowParser> } @Nullable - private static String rawMapToPrint(@Nullable Map rawMap) + private static String rawMapToPrint(@Nullable Map rawMap) { if (rawMap == null) { return null; diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/RegexReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/RegexReader.java index 876ef98cbca..aabf4fe1603 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/RegexReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/RegexReader.java @@ -39,7 +39,7 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; -public class RegexReader extends TextReader +public class RegexReader extends TextReader.Strings { private final String pattern; private final Pattern compiledPattern; diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceReader.java index 85e3780ac38..3438abb02e2 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceReader.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.segment.RowAdapter; import org.joda.time.DateTime; import java.io.IOException; @@ -135,4 +136,10 @@ public class TimedShutoffInputSourceReader implements InputSourceReader return wrappingIterator; } + + @Override + public RowAdapter rowAdapter() + { + return delegate.rowAdapter(); + } } diff --git a/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java index e86d51b8d96..9759f163915 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -20,7 +20,6 @@ package org.apache.druid.java.util.common; import com.google.common.base.Strings; -import it.unimi.dsi.fastutil.bytes.ByteArrayList; import org.apache.commons.io.IOUtils; import javax.annotation.Nonnull; @@ -261,11 +260,6 @@ public class StringUtils return StringUtils.fromUtf8(buffer, buffer.remaining()); } - public static String fromUtf8(final ByteArrayList buffer) - { - return StringUtils.fromUtf8(buffer.elements(), 0, buffer.size()); - } - /** * If buffer is Decodes a UTF-8 string from the remaining bytes of a buffer. * Advances the position of the buffer by {@link ByteBuffer#remaining()}. diff --git a/processing/src/main/java/org/apache/druid/java/util/common/parsers/AbstractFlatTextFormatParser.java b/processing/src/main/java/org/apache/druid/java/util/common/parsers/AbstractFlatTextFormatParser.java index d83c8280f9c..1826b73857a 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/parsers/AbstractFlatTextFormatParser.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/parsers/AbstractFlatTextFormatParser.java @@ -100,7 +100,7 @@ public abstract class AbstractFlatTextFormatParser implements Parser fieldNames) { if (fieldNames != null) { - this.fieldNames = TextReader.findOrCreateColumnNames(Lists.newArrayList(fieldNames)); + this.fieldNames = TextReader.findOrCreateInputRowSignature(Lists.newArrayList(fieldNames)).getColumnNames(); } } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index eca175267a7..0f1e0eca4fc 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -32,6 +32,7 @@ import com.google.common.primitives.Longs; import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.ListBasedInputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.DimensionSchema; @@ -684,11 +685,13 @@ public abstract class IncrementalIndex implements Iterable, Closeable, Colu return ((MapBasedInputRow) inputRow).getEvent().toString(); } + if (inputRow instanceof ListBasedInputRow) { + return ((ListBasedInputRow) inputRow).asMap().toString(); + } + if (inputRow instanceof TransformedInputRow) { InputRow innerRow = ((TransformedInputRow) inputRow).getBaseRow(); - if (innerRow instanceof MapBasedInputRow) { - return ((MapBasedInputRow) innerRow).getEvent().toString(); - } + return getSimplifiedEventStringFromRow(innerRow); } return inputRow.toString(); diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java index c1faa274845..ae8c8709a1d 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java @@ -20,17 +20,17 @@ package org.apache.druid.data.input.impl; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.ListBasedInputRow; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.segment.column.RowSignature; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -163,59 +163,63 @@ public class CsvReaderTest "65,Here I write \\n slash n,2018-05-09T10:00:00Z" ) ); + final RowSignature signature = + RowSignature.builder() + .add("Value", null) + .add("Comment", null) + .add("Timestamp", null) + .build(); + final List expectedResults = ImmutableList.of( - new MapBasedInputRow( + new ListBasedInputRow( + signature, DateTimes.of("2018-05-05T10:00:00Z"), ImmutableList.of("Timestamp"), - ImmutableMap.of( - "Value", + ImmutableList.of( "3", - "Comment", "Lets do some \"normal\" quotes", - "Timestamp", "2018-05-05T10:00:00Z" ) ), - new MapBasedInputRow( + new ListBasedInputRow( + signature, DateTimes.of("2018-05-06T10:00:00Z"), ImmutableList.of("Timestamp"), - ImmutableMap.of( - "Value", + ImmutableList.of( "34", - "Comment", "Lets do some \"normal\", quotes with comma", - "Timestamp", "2018-05-06T10:00:00Z" ) ), - new MapBasedInputRow( + new ListBasedInputRow( + signature, DateTimes.of("2018-05-07T10:00:00Z"), ImmutableList.of("Timestamp"), - ImmutableMap.of( - "Value", + ImmutableList.of( "343", - "Comment", "Lets try \\\"it\\\" with slash quotes", - "Timestamp", "2018-05-07T10:00:00Z" ) ), - new MapBasedInputRow( + new ListBasedInputRow( + signature, DateTimes.of("2018-05-08T10:00:00Z"), ImmutableList.of("Timestamp"), - ImmutableMap.of( - "Value", + ImmutableList.of( "545", - "Comment", "Lets try \\\"it\\\", with slash quotes and comma", - "Timestamp", "2018-05-08T10:00:00Z" ) ), - new MapBasedInputRow( + new ListBasedInputRow( + signature, DateTimes.of("2018-05-09T10:00:00Z"), ImmutableList.of("Timestamp"), - ImmutableMap.of("Value", "65", "Comment", "Here I write \\n slash n", "Timestamp", "2018-05-09T10:00:00Z") + ImmutableList.of( + "65", + "Here I write \\n slash n", + "2018-05-09T10:00:00Z" + ) ) ); final CsvInputFormat format = new CsvInputFormat( diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/DelimitedBytesTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/DelimitedBytesTest.java new file mode 100644 index 00000000000..10982d63278 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/data/input/impl/DelimitedBytesTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +public class DelimitedBytesTest extends InitializedNullHandlingTest +{ + private static final byte TSV = (byte) '\t'; + + @Test + public void testEmpty() + { + Assert.assertEquals( + Collections.singletonList(NullHandling.sqlCompatible() ? null : ""), + DelimitedBytes.split(new byte[0], TSV, DelimitedBytes.UNKNOWN_FIELD_COUNT) + ); + } + + @Test + public void testNoDelimiter() + { + Assert.assertEquals( + Collections.singletonList("abc"), + DelimitedBytes.split(StringUtils.toUtf8("abc"), TSV, DelimitedBytes.UNKNOWN_FIELD_COUNT) + ); + } + + @Test + public void testOneDelimiter() + { + Assert.assertEquals( + Arrays.asList("a", "bc"), + DelimitedBytes.split(StringUtils.toUtf8("a\tbc"), TSV, DelimitedBytes.UNKNOWN_FIELD_COUNT) + ); + } + + @Test + public void testDelimiterAtStart() + { + Assert.assertEquals( + Arrays.asList(NullHandling.sqlCompatible() ? null : "", "abc"), + DelimitedBytes.split(StringUtils.toUtf8("\tabc"), TSV, DelimitedBytes.UNKNOWN_FIELD_COUNT) + ); + } + + @Test + public void testDelimiterAtEnd() + { + Assert.assertEquals( + Arrays.asList("a", "bc", NullHandling.sqlCompatible() ? null : ""), + DelimitedBytes.split(StringUtils.toUtf8("a\tbc\t"), TSV, DelimitedBytes.UNKNOWN_FIELD_COUNT) + ); + } + + @Test + public void testMoreFieldsThanHint() + { + Assert.assertEquals( + Arrays.asList("a", "b", "c"), + DelimitedBytes.split(StringUtils.toUtf8("a\tb\tc"), TSV, 1) + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/FastLineIteratorTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/FastLineIteratorTest.java index 1887510744f..227ab2039fd 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/FastLineIteratorTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/FastLineIteratorTest.java @@ -47,14 +47,14 @@ public class FastLineIteratorTest { expectedException.expect(NullPointerException.class); //noinspection ResultOfObjectAllocationIgnored - new FastLineIterator(null); + new FastLineIterator.Strings(null); } @Test public void testEmptyInput() { byte[] input = new byte[0]; - FastLineIterator iterator = new FastLineIterator(new ByteArrayInputStream(input)); + FastLineIterator iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertFalse(iterator.hasNext()); @@ -66,12 +66,12 @@ public class FastLineIteratorTest public void testSoloCr() { byte[] input; - FastLineIterator iterator; + FastLineIterator iterator; // a single \r // it is expected that this emits a complete line with \r since a return on its own is not a line break input = "\r".getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals("\r", iterator.next()); @@ -82,12 +82,12 @@ public class FastLineIteratorTest public void testSoloLf() { byte[] input; - FastLineIterator iterator; + FastLineIterator iterator; // a single \n // should emit a single complete 'line' as "", and no trailing line (since EOF) input = "\n".getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals("", iterator.next()); @@ -98,13 +98,13 @@ public class FastLineIteratorTest public void testBackwardsLfCr() { byte[] input; - FastLineIterator iterator; + FastLineIterator iterator; // should emit two lines: // first one is an empty line for before the \n, // second is the \r alone input = "\n\r".getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals("", iterator.next()); @@ -117,11 +117,11 @@ public class FastLineIteratorTest public void testForwardsSoloCrLf() { byte[] input; - FastLineIterator iterator; + FastLineIterator iterator; // should emit one (empty) line input = "\r\n".getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals("", iterator.next()); @@ -132,11 +132,11 @@ public class FastLineIteratorTest public void testSingleLine() { byte[] input; - FastLineIterator iterator; + FastLineIterator iterator; // without an end input = "abcd".getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals("abcd", iterator.next()); @@ -144,7 +144,7 @@ public class FastLineIteratorTest // with an end input = "abcd\n".getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals("abcd", iterator.next()); @@ -152,7 +152,7 @@ public class FastLineIteratorTest // with an end input = "abcd\r\n".getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals("abcd", iterator.next()); @@ -163,10 +163,10 @@ public class FastLineIteratorTest public void testMultipleLines() { byte[] input; - FastLineIterator iterator; + FastLineIterator iterator; input = "abcd\ndefg\nhijk".getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals("abcd", iterator.next()); @@ -179,10 +179,10 @@ public class FastLineIteratorTest public void testEmptyMiddleLine() { byte[] input; - FastLineIterator iterator; + FastLineIterator iterator; input = "abcd\n\nhijk\n".getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals("abcd", iterator.next()); @@ -195,10 +195,10 @@ public class FastLineIteratorTest public void testEmptyLastLine() { byte[] input; - FastLineIterator iterator; + FastLineIterator iterator; input = "abcd\ndefg\nhijk\n".getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals("abcd", iterator.next()); @@ -211,14 +211,14 @@ public class FastLineIteratorTest public void testOverlappingBuffer() { byte[] input; - FastLineIterator iterator; + FastLineIterator iterator; - String line1 = RandomStringUtils.random(FastLineIterator.BUFFER_SIZE - 20); - String line2 = RandomStringUtils.random(40); - String line3 = RandomStringUtils.random(20); + String line1 = randomString(FastLineIterator.BUFFER_SIZE - 20); + String line2 = randomString(40); + String line3 = randomString(20); input = (line1 + "\n" + line2 + "\n" + line3 + "\n").getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals(line1, iterator.next()); @@ -231,15 +231,15 @@ public class FastLineIteratorTest public void testLineLargerThanBufferSize() { byte[] input; - FastLineIterator iterator; + FastLineIterator iterator; // random lengths that force multiple buffer trips - String line1 = RandomStringUtils.random(FastLineIterator.BUFFER_SIZE * 3 + 10); - String line2 = RandomStringUtils.random(FastLineIterator.BUFFER_SIZE * 2 + 15); - String line3 = RandomStringUtils.random(FastLineIterator.BUFFER_SIZE + 9); + String line1 = randomString(FastLineIterator.BUFFER_SIZE * 3 + 10); + String line2 = randomString(FastLineIterator.BUFFER_SIZE * 2 + 15); + String line3 = randomString(FastLineIterator.BUFFER_SIZE + 9); input = (line1 + "\r\n" + line2 + "\r\n" + line3 + "\r\n").getBytes(StandardCharsets.UTF_8); - iterator = new FastLineIterator(new ByteArrayInputStream(input)); + iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input)); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals(line1, iterator.next()); @@ -247,4 +247,14 @@ public class FastLineIteratorTest Assert.assertEquals(line3, iterator.next()); Assert.assertFalse(iterator.hasNext()); } + + /** + * Random string that does not contain \r or \n. + */ + private static String randomString(final int length) + { + return RandomStringUtils.random(length) + .replace('\r', '?') + .replace('\n', '?'); + } }