Faster parsing: reduce String usage, list-based input rows. (#15681)

* Faster parsing: reduce String usage, list-based input rows.

Three changes:

1) Reworked FastLineIterator to optionally avoid generating Strings
   entirely, and reduce copying somewhat. Benefits the line-oriented
   JSON, CSV, delimited (TSV), and regex formats.

2) In the delimited (TSV) format, when the delimiter is a single byte,
   split on UTF-8 bytes directly.

3) In CSV and delimited (TSV) formats, use list-based input rows when
   the column list is provided upfront by the user.

* Fix style.

* Fix inspections.

* Restore validation.

* Remove fastutil-extra.

* Exception type.

* Fixes for error messages.

* Fixes for null handling.
This commit is contained in:
Gian Merlino 2024-01-18 03:18:46 -08:00 committed by GitHub
parent 55acf2e2ff
commit d3d0c1c91e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1178 additions and 210 deletions

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DelimitedInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@Fork(value = 1)
public class DelimitedInputFormatBenchmark
{
private static final int NUM_EVENTS = 1200;
private static final List<String> COLUMNS =
ImmutableList.of(
"timestamp",
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city",
"added",
"deleted",
"delta"
);
static {
NullHandling.initializeForTests();
}
@Param({"false", "true"})
private boolean fromHeader;
InputEntityReader reader;
DelimitedInputFormat format;
byte[] data;
@Setup(Level.Invocation)
public void prepareReader()
{
ByteEntity source = new ByteEntity(data);
reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
DimensionsSpec.builder().setIncludeAllDimensions(true).build(),
ColumnsFilter.all()
),
source,
null
);
}
@Setup(Level.Trial)
public void prepareData() throws Exception
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String headerString =
"timestamp\tpage\tlanguage\tuser\tunpatrolled\tnewPage\trobot\tanonymous\tnamespace\tcontinent\tcountry\tregion\tcity\tadded\tdeleted\tdelta\n";
final String dataString =
"2013-08-31T01:02:33Z\tGypsy Danger\ten\tnuclear\ttrue\ttrue\tfalse\tfalse\tarticle\tNorth America\tUnited States\tBay Area\tSan Francisco\t57\t200\t-143\n"
+ "2013-08-31T03:32:45Z\tStriker Eureka\ten\tspeed\tfalse\ttrue\ttrue\tfalse\twikipedia\tAustralia\tAustralia\tCantebury\tSyndey\t459\t129\t330\n"
+ "2013-08-31T07:11:21Z\tCherno Alpha\tru\tmasterYi\tfalse\ttrue\ttrue\tfalse\tarticle\tAsia\tRussia\tOblast\tMoscow\t123\t12\t111\n";
baos.write(StringUtils.toUtf8(headerString));
for (int i = 0; i < NUM_EVENTS / 3; i++) {
baos.write(StringUtils.toUtf8(dataString));
}
data = baos.toByteArray();
}
@Setup(Level.Trial)
public void prepareFormat()
{
format = new DelimitedInputFormat(fromHeader ? null : COLUMNS, null, "\t", null, fromHeader, fromHeader ? 0 : 1);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void baseline(final Blackhole blackhole) throws IOException
{
int counted = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
final InputRow row = iterator.next();
if (row != null) {
counted += 1;
}
blackhole.consume(row);
}
}
if (counted != NUM_EVENTS) {
throw new RuntimeException("invalid number of loops, counted = " + counted);
}
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(DelimitedInputFormatBenchmark.class.getSimpleName())
.build();
new Runner(opt).run();
}
}

View File

@ -25,6 +25,7 @@ import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.segment.RowAdapter;
import java.io.IOException;
@ -56,4 +57,10 @@ public class CountableInputSourceReader implements InputSourceReader
{
return inputSourceReader.sample();
}
@Override
public RowAdapter<InputRow> rowAdapter()
{
return inputSourceReader.rowAdapter();
}
}

View File

@ -29,7 +29,6 @@ import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.WarningCounters;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.SegmentId;
@ -142,7 +141,7 @@ public class ExternalSegment extends RowBasedSegment<InputRow>
}
}
),
RowAdapters.standardRow(),
reader.rowAdapter(),
signature
);
this.inputSource = inputSource;

View File

@ -1165,8 +1165,8 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
// the last row has parse exception when indexing, check if rawColumns and exception message match the expected
//
String indexParseExceptioMessage = ParserType.STR_CSV.equals(parserType)
? "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, dim2=null, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]"
: "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]";
? "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row={timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, dim2=null, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]"
: "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row={timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]";
assertEqualsSamplerResponseRow(
new SamplerResponseRow(
rawColumns4ParseExceptionRow,

View File

@ -910,11 +910,6 @@
<artifactId>fastutil-core</artifactId>
<version>${fastutil.version}</version>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-extra</artifactId>
<version>${fastutil.version}</version>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>

View File

@ -233,10 +233,6 @@
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-extra</artifactId>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>

View File

@ -30,6 +30,8 @@ import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.utils.CompressionUtils;
import java.io.File;
@ -85,4 +87,15 @@ public interface InputFormat
{
return size;
}
/**
* Returns an adapter that can read the rows from {@link #createReader(InputRowSchema, InputEntity, File)},
* given a particular {@link InputRowSchema}. Note that {@link RowAdapters#standardRow()} always works, but the
* one returned by this method may be more performant.
*/
@SuppressWarnings("unused") // inputRowSchema is currently unused, but may be used in the future for ColumnsFilter
default RowAdapter<InputRow> createRowAdapter(InputRowSchema inputRowSchema)
{
return RowAdapters.standardRow();
}
}

View File

@ -22,6 +22,8 @@ package org.apache.druid.data.input;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.RowAdapters;
import java.io.IOException;
@ -45,4 +47,12 @@ public interface InputSourceReader
CloseableIterator<InputRow> read(InputStats inputStats) throws IOException;
CloseableIterator<InputRowListPlusRawValues> sample() throws IOException;
/**
* Returns an adapter that can be used to read the rows from {@link #read()}.
*/
default RowAdapter<InputRow> rowAdapter()
{
return RowAdapters.standardRow();
}
}

View File

@ -73,11 +73,12 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
}
catch (IOException e) {
final Map<String, Object> metadata = intermediateRowIteratorWithMetadata.currentMetadata();
final String rowAsString = intermediateRowAsString(row);
rows = new ExceptionThrowingIterator(new ParseException(
String.valueOf(row),
rowAsString,
e,
buildParseExceptionMessage(
StringUtils.format("Unable to parse row [%s]", row),
StringUtils.format("Unable to parse row [%s]", rowAsString),
source(),
currentRecordNumber,
metadata
@ -166,10 +167,11 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
rawColumnsList = toMap(row);
}
catch (Exception e) {
final String rowAsString = intermediateRowAsString(row);
return InputRowListPlusRawValues.of(
null,
new ParseException(String.valueOf(row), e, buildParseExceptionMessage(
StringUtils.nonStrictFormat("Unable to parse row [%s] into JSON", row),
new ParseException(rowAsString, e, buildParseExceptionMessage(
StringUtils.nonStrictFormat("Unable to parse row [%s] into JSON", rowAsString),
source(),
null,
metadata
@ -178,10 +180,11 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
}
if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
final String rowAsString = intermediateRowAsString(row);
return InputRowListPlusRawValues.of(
null,
new ParseException(String.valueOf(row), buildParseExceptionMessage(
StringUtils.nonStrictFormat("No map object parsed for row [%s]", row),
new ParseException(rowAsString, buildParseExceptionMessage(
StringUtils.nonStrictFormat("No map object parsed for row [%s]", rowAsString),
source(),
null,
metadata
@ -195,14 +198,15 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
}
catch (ParseException e) {
return InputRowListPlusRawValues.ofList(rawColumnsList, new ParseException(
String.valueOf(row),
intermediateRowAsString(row),
e,
buildParseExceptionMessage(e.getMessage(), source(), null, metadata)
));
}
catch (IOException e) {
ParseException exception = new ParseException(String.valueOf(row), e, buildParseExceptionMessage(
StringUtils.nonStrictFormat("Unable to parse row [%s] into inputRow", row),
final String rowAsString = intermediateRowAsString(row);
ParseException exception = new ParseException(rowAsString, e, buildParseExceptionMessage(
StringUtils.nonStrictFormat("Unable to parse row [%s] into inputRow", rowAsString),
source(),
null,
metadata
@ -231,6 +235,14 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
return CloseableIteratorWithMetadata.withEmptyMetadata(intermediateRowIterator());
}
/**
* String representation of an intermediate row. Used for error messages.
*/
protected String intermediateRowAsString(@Nullable T row)
{
return String.valueOf(row);
}
/**
* @return InputEntity which the implementation is reading from. Useful in generating informative {@link ParseException}s.
* For example, in case of {@link org.apache.druid.data.input.impl.FileEntity}, file name containing erroneous records

View File

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Input row backed by a {@link List}. This implementation is most useful when the columns and dimensions are known
* ahead of time, prior to encountering any data. It is used in concert with {@link ListBasedInputRowAdapter}, which
* enables
*/
public class ListBasedInputRow implements InputRow
{
private final RowSignature signature;
private final DateTime timestamp;
private final List<String> dimensions;
private final List<Object> data;
/**
* Create a row.
*
* @param signature signature; must match the "data" list
* @param timestamp row timestamp
* @param dimensions dimensions to be reported by {@link #getDimensions()}
* @param data row data
*/
public ListBasedInputRow(
final RowSignature signature,
final DateTime timestamp,
final List<String> dimensions,
final List<Object> data
)
{
this.signature = signature;
this.timestamp = timestamp;
this.dimensions = dimensions;
this.data = data;
}
/**
* Create a row and parse a timestamp. Throws {@link ParseException} if the timestamp cannot be parsed.
*
* @param signature signature; must match the "data" list
* @param timestampSpec timestamp spec
* @param dimensions dimensions to be reported by {@link #getDimensions()}
* @param data row data
*/
public static InputRow parse(
RowSignature signature,
TimestampSpec timestampSpec,
List<String> dimensions,
List<Object> data
)
{
final DateTime timestamp = parseTimestamp(timestampSpec, data, signature);
return new ListBasedInputRow(signature, timestamp, dimensions, data);
}
@Override
public List<String> getDimensions()
{
return dimensions;
}
@Override
public long getTimestampFromEpoch()
{
return timestamp.getMillis();
}
@Override
public DateTime getTimestamp()
{
return timestamp;
}
@Override
public List<String> getDimension(String dimension)
{
return Rows.objectToStrings(getRaw(dimension));
}
@Nullable
@Override
public Object getRaw(String columnName)
{
final int i = signature.indexOf(columnName);
if (i < 0 || i >= data.size()) {
return null;
} else {
return data.get(i);
}
}
@Nullable
public Object getRaw(int columnNumber)
{
if (columnNumber < data.size()) {
return data.get(columnNumber);
} else {
// Row may have ended early, which is OK.
return null;
}
}
@Nullable
@Override
public Number getMetric(String metric)
{
return Rows.objectToNumber(metric, getRaw(metric), true);
}
@Override
public int compareTo(Row o)
{
return timestamp.compareTo(o.getTimestamp());
}
public Map<String, Object> asMap()
{
return Utils.zipMapPartial(signature.getColumnNames(), data);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ListBasedInputRow that = (ListBasedInputRow) o;
return Objects.equals(dimensions, that.dimensions)
&& Objects.equals(signature, that.signature)
&& Objects.equals(data, that.data);
}
@Override
public int hashCode()
{
return Objects.hash(dimensions, signature, data);
}
@Override
public String toString()
{
return "{" +
"timestamp=" + DateTimes.utc(getTimestampFromEpoch()) +
", event=" + asMap() +
", dimensions=" + dimensions +
'}';
}
/**
* Helper for {@link #parse(RowSignature, TimestampSpec, List, List)}.
*/
private static DateTime parseTimestamp(
final TimestampSpec timestampSpec,
final List<Object> theList,
final RowSignature signature
)
{
final int timeColumnIndex = signature.indexOf(timestampSpec.getTimestampColumn());
final Object timeValue;
if (theList != null && timeColumnIndex >= 0 && timeColumnIndex < theList.size()) {
timeValue = theList.get(timeColumnIndex);
} else {
timeValue = null;
}
return MapInputRowParser.parseTimestampOrThrowParseException(
timeValue,
timestampSpec,
() -> Utils.zipMapPartial(signature.getColumnNames(), theList)
);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.column.RowSignature;
import java.util.function.Function;
import java.util.function.ToLongFunction;
/**
* Adapter for reading {@link ListBasedInputRow}. The {@link RowAdapters#standardRow()} would also work, but this
* one is faster because it avoids per-row field-name-to-index lookups. Must be instantiated with the same
* {@link RowSignature} as the {@link ListBasedInputRow} that are to be parsed.
*/
public class ListBasedInputRowAdapter implements RowAdapter<InputRow>
{
private final RowSignature fields;
public ListBasedInputRowAdapter(final RowSignature fields)
{
this.fields = fields;
}
@Override
public ToLongFunction<InputRow> timestampFunction()
{
return Row::getTimestampFromEpoch;
}
@Override
public Function<InputRow, Object> columnFunction(String columnName)
{
final int i = fields.indexOf(columnName);
if (i < 0) {
return row -> null;
} else {
// Get by column number, not name.
return row -> ((ListBasedInputRow) row).getRaw(i);
}
}
}

View File

@ -86,7 +86,7 @@ public class MapBasedInputRow extends MapBasedRow implements InputRow
@Override
public String toString()
{
return "MapBasedInputRow{" +
return "{" +
"timestamp=" + DateTimes.utc(getTimestampFromEpoch()) +
", event=" + getEvent() +
", dimensions=" + dimensions +

View File

@ -19,14 +19,17 @@
package org.apache.druid.data.input;
import com.google.common.base.Strings;
import org.apache.druid.data.input.impl.FastLineIterator;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.common.parsers.ParserUtils;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -35,12 +38,12 @@ import java.util.Map;
/**
* Abstract {@link InputEntityReader} for text format readers such as CSV or JSON.
*/
public abstract class TextReader extends IntermediateRowParsingReader<String>
public abstract class TextReader<T> extends IntermediateRowParsingReader<T>
{
private final InputRowSchema inputRowSchema;
private final InputEntity source;
public TextReader(InputRowSchema inputRowSchema, InputEntity source)
protected TextReader(InputRowSchema inputRowSchema, InputEntity source)
{
this.inputRowSchema = inputRowSchema;
this.source = source;
@ -52,9 +55,9 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
}
@Override
public CloseableIteratorWithMetadata<String> intermediateRowIteratorWithMetadata() throws IOException
public CloseableIteratorWithMetadata<T> intermediateRowIteratorWithMetadata() throws IOException
{
final CloseableIterator<String> delegate = new FastLineIterator(source.open());
final CloseableIterator<T> delegate = makeSourceIterator(source.open());
final int numHeaderLines = getNumHeaderLinesToSkip();
for (int i = 0; i < numHeaderLines && delegate.hasNext(); i++) {
delegate.next(); // skip lines
@ -63,7 +66,7 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
processHeaderLine(delegate.next());
}
return new CloseableIteratorWithMetadata<String>()
return new CloseableIteratorWithMetadata<T>()
{
private static final String LINE_KEY = "Line";
private long currentLineNumber = numHeaderLines + (needsToProcessHeaderLine() ? 1 : 0);
@ -81,7 +84,7 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
}
@Override
public String next()
public T next()
{
currentLineNumber++;
return delegate.next();
@ -108,7 +111,7 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
* This method will be called after {@link #getNumHeaderLinesToSkip()} and {@link #processHeaderLine}.
*/
@Override
public abstract List<InputRow> parseInputRows(String intermediateRow) throws IOException, ParseException;
public abstract List<InputRow> parseInputRows(T intermediateRow) throws IOException, ParseException;
/**
* Returns the number of header lines to skip.
@ -125,23 +128,61 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
/**
* Processes a header line. This will be called if {@link #needsToProcessHeaderLine()} = true.
*/
public abstract void processHeaderLine(String line) throws IOException;
public abstract void processHeaderLine(T line) throws IOException;
public static List<String> findOrCreateColumnNames(List<String> parsedLine)
protected abstract CloseableIterator<T> makeSourceIterator(InputStream in);
public static RowSignature findOrCreateInputRowSignature(List<String> parsedLine)
{
final List<String> columns = new ArrayList<>(parsedLine.size());
for (int i = 0; i < parsedLine.size(); i++) {
if (Strings.isNullOrEmpty(parsedLine.get(i))) {
if (com.google.common.base.Strings.isNullOrEmpty(parsedLine.get(i))) {
columns.add(ParserUtils.getDefaultColumnName(i));
} else {
columns.add(parsedLine.get(i));
}
}
if (columns.isEmpty()) {
return ParserUtils.generateFieldNames(parsedLine.size());
} else {
ParserUtils.validateFields(columns);
return columns;
ParserUtils.validateFields(columns);
final RowSignature.Builder builder = RowSignature.builder();
for (final String column : columns) {
builder.add(column, null);
}
return builder.build();
}
public abstract static class Strings extends TextReader<String>
{
protected Strings(InputRowSchema inputRowSchema, InputEntity source)
{
super(inputRowSchema, source);
}
@Override
protected CloseableIterator<String> makeSourceIterator(InputStream in)
{
return new FastLineIterator.Strings(in);
}
}
public abstract static class Bytes extends TextReader<byte[]>
{
protected Bytes(InputRowSchema inputRowSchema, InputEntity source)
{
super(inputRowSchema, source);
}
@Override
protected CloseableIterator<byte[]> makeSourceIterator(InputStream in)
{
return new FastLineIterator.Bytes(in);
}
@Override
protected String intermediateRowAsString(@Nullable byte[] row)
{
// Like String.valueOf, but for UTF-8 bytes. Keeps error messages consistent between String and Bytes.
return row == null ? "null" : StringUtils.fromUtf8(row);
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.io.File;
@ -78,7 +79,8 @@ public class CsvInputFormat extends FlatTextInputFormat
getColumns(),
isFindColumnsFromHeader(),
getSkipHeaderRows(),
line -> Arrays.asList(parser.parseLine(line))
line -> Arrays.asList(parser.parseLine(StringUtils.fromUtf8(line))),
useListBasedInputRows()
);
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* Utility function for {@link DelimitedInputFormat}.
*/
public class DelimitedBytes
{
/**
* Parameter for {@link #split(byte[], byte, int)} signifying that we do not know the expected number of fields.
*/
public static final int UNKNOWN_FIELD_COUNT = -1;
/**
* Split UTF-8 bytes by a particular delimiter. When {@link NullHandling#sqlCompatible()}, empty parts are
* returned as nulls. When {@link NullHandling#replaceWithDefault()}, empty parts are returned as empty strings.
*
* @param bytes utf-8 bytes
* @param delimiter the delimiter
* @param numFieldsHint expected number of fields, or {@link #UNKNOWN_FIELD_COUNT}
*/
public static List<String> split(final byte[] bytes, final byte delimiter, final int numFieldsHint)
{
final List<String> out = numFieldsHint == UNKNOWN_FIELD_COUNT ? new ArrayList<>() : new ArrayList<>(numFieldsHint);
int start = 0;
int position = 0;
while (position < bytes.length) {
if (bytes[position] == delimiter) {
final String s = StringUtils.fromUtf8(bytes, start, position - start);
out.add(s.isEmpty() && NullHandling.sqlCompatible() ? null : s);
start = position + 1;
}
position++;
}
final String s = StringUtils.fromUtf8(bytes, start, position - start);
out.add(s.isEmpty() && NullHandling.sqlCompatible() ? null : s);
return out;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.io.File;
@ -80,10 +81,33 @@ public class DelimitedInputFormat extends FlatTextInputFormat
getColumns(),
isFindColumnsFromHeader(),
getSkipHeaderRows(),
line -> splitToList(Splitter.on(getDelimiter()), line)
makeDelimitedValueParser(
getDelimiter(),
useListBasedInputRows() ? getColumns().size() : DelimitedBytes.UNKNOWN_FIELD_COUNT
),
useListBasedInputRows()
);
}
/**
* Create a parser for a particular delimiter and expected number of fields.
*/
private static DelimitedValueReader.DelimitedValueParser makeDelimitedValueParser(
final String delimiter,
final int numFields
)
{
final byte[] utf8Delimiter = StringUtils.toUtf8(delimiter);
if (utf8Delimiter.length == 1) {
// Single-byte delimiter: split bytes directly, prior to UTF-8 conversion
final byte delimiterByte = utf8Delimiter[0];
return bytes -> DelimitedBytes.split(bytes, delimiterByte, numFields);
} else {
final Splitter splitter = Splitter.on(delimiter);
return bytes -> splitToList(splitter, StringUtils.fromUtf8(bytes));
}
}
/**
* Copied from Guava's {@link Splitter#splitToList(CharSequence)}.
* This is to avoid the error of the missing method signature when using an old Guava library.

View File

@ -22,19 +22,24 @@ package org.apache.druid.data.input.impl;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.ListBasedInputRow;
import org.apache.druid.data.input.TextReader;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.common.parsers.ParserUtils;
import org.apache.druid.java.util.common.parsers.Parsers;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -42,18 +47,31 @@ import java.util.Map;
/**
* DelimitedValueReader is the reader for Delimitor Separate Value format input data(CSV/TSV).
*/
public class DelimitedValueReader extends TextReader
public class DelimitedValueReader extends TextReader.Bytes
{
private final boolean findColumnsFromHeader;
private final int skipHeaderRows;
private final Function<String, Object> multiValueFunction;
private final DelimitedValueParser parser;
/**
* Signature of the delimited files. Written by {@link #setSignature(List)}.
*/
@Nullable
private List<String> columns;
private RowSignature inputRowSignature;
/**
* Dimensions list used for generating {@link InputRow}. Derived from {@link #getInputRowSchema()}, but has
* dimensions locked-in based on the {@link #inputRowSignature}, so they do not need to be recalculated on each row.
* Written by {@link #setSignature(List)}.
*/
@Nullable
private List<String> inputRowDimensions;
private final boolean useListBasedInputRows;
interface DelimitedValueParser
{
List<String> parseLine(String line) throws IOException;
List<String> parseLine(byte[] line) throws IOException;
}
DelimitedValueReader(
@ -63,7 +81,8 @@ public class DelimitedValueReader extends TextReader
@Nullable List<String> columns,
boolean findColumnsFromHeader,
int skipHeaderRows,
DelimitedValueParser parser
DelimitedValueParser parser,
boolean useListBasedInputRows
)
{
super(inputRowSchema, source);
@ -71,28 +90,58 @@ public class DelimitedValueReader extends TextReader
this.skipHeaderRows = skipHeaderRows;
final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter));
this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row
if (!findColumnsFromHeader && columns != null) {
// If findColumnsFromHeader, inputRowSignature will be set later.
setSignature(columns);
}
this.parser = parser;
this.useListBasedInputRows = useListBasedInputRows;
}
@Override
public List<InputRow> parseInputRows(String line) throws IOException, ParseException
public List<InputRow> parseInputRows(byte[] line) throws IOException, ParseException
{
final Map<String, Object> zipped = parseLine(line);
return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped));
if (useListBasedInputRows) {
final List<Object> parsed = readLineAsList(line);
return Collections.singletonList(
ListBasedInputRow.parse(
inputRowSignature,
getInputRowSchema().getTimestampSpec(),
inputRowDimensions,
parsed
)
);
} else {
final Map<String, Object> zipped = readLineAsMap(line);
return Collections.singletonList(
MapInputRowParser.parse(
getInputRowSchema().getTimestampSpec(),
inputRowDimensions,
zipped
)
);
}
}
@Override
public List<Map<String, Object>> toMap(String intermediateRow) throws IOException
public List<Map<String, Object>> toMap(byte[] intermediateRow) throws IOException
{
return Collections.singletonList(parseLine(intermediateRow));
return Collections.singletonList(readLineAsMap(intermediateRow));
}
private Map<String, Object> parseLine(String line) throws IOException
private List<Object> readLineAsList(byte[] line) throws IOException
{
final List<String> parsed = parser.parseLine(line);
return new ArrayList<>(Lists.transform(parsed, multiValueFunction));
}
private Map<String, Object> readLineAsMap(byte[] line) throws IOException
{
final List<String> parsed = parser.parseLine(line);
return Utils.zipMapPartial(
Preconditions.checkNotNull(columns, "columns"),
Preconditions.checkNotNull(inputRowSignature, "inputRowSignature").getColumnNames(),
Iterables.transform(parsed, multiValueFunction)
);
}
@ -110,14 +159,31 @@ public class DelimitedValueReader extends TextReader
}
@Override
public void processHeaderLine(String line) throws IOException
public void processHeaderLine(byte[] line) throws IOException
{
if (!findColumnsFromHeader) {
throw new ISE("Don't call this if findColumnsFromHeader = false");
}
columns = findOrCreateColumnNames(parser.parseLine(line));
if (columns.isEmpty()) {
setSignature(parser.parseLine(line));
}
/**
* Set {@link #inputRowDimensions} and {@link #inputRowSignature} based on a set of header columns. Must be called
* prior to {@link #parseInputRows(byte[])}.
*
* @param columns header columns
*/
private void setSignature(final List<String> columns)
{
inputRowSignature = findOrCreateInputRowSignature(columns);
if (inputRowSignature.size() == 0) {
throw new ISE("Empty columns");
}
inputRowDimensions = MapInputRowParser.findDimensions(
getInputRowSchema().getTimestampSpec(),
getInputRowSchema().getDimensionsSpec(),
ImmutableSet.copyOf(inputRowSignature.getColumnNames())
);
}
}

View File

@ -20,64 +20,77 @@
package org.apache.druid.data.input.impl;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import com.google.common.primitives.Longs;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.NoSuchElementException;
/**
* Like the Apache Commons LineIterator, but faster.
*
* Use {@link Strings} or {@link Bytes} as appropriate. Bytes is faster.
*/
public class FastLineIterator implements CloseableIterator<String>
public abstract class FastLineIterator<T> implements CloseableIterator<T>
{
// visible for tests
static final int BUFFER_SIZE = 512;
private static final ThreadLocal<byte[]> BUFFER_LOCAL = ThreadLocal.withInitial(() -> new byte[BUFFER_SIZE]);
private static final byte CR = (byte) '\r';
private static final byte LF = (byte) '\n';
private final InputStream source;
private final ByteArrayList buffer;
private ByteBuffer buffer;
private int lineStart = 0;
private int position = 0;
private int limit = 0;
private boolean endOfStream;
private T nextLine;
private String nextLine;
/**
* {@link #LF} in every byte.
*/
private static final long LF_REPEAT = firstOccurrencePattern(LF);
/**
* Constructor; a local buffer will be created
*
* @param source
*/
public FastLineIterator(InputStream source)
protected FastLineIterator(InputStream source)
{
this(source, new ByteArrayList());
this(source, new byte[BUFFER_SIZE]);
}
/**
* Constructor; BYO buffer.
*
* Existing contents of the buffer will be destroyed.
*
* @param source
* @param buffer a buffer used for between-read calls
*/
public FastLineIterator(InputStream source, ByteArrayList buffer)
protected FastLineIterator(InputStream source, byte[] buffer)
{
Preconditions.checkNotNull(source);
Preconditions.checkNotNull(buffer);
this.source = source;
this.nextLine = null;
this.buffer = buffer;
this.buffer.size(0);
setBuffer(buffer);
}
@Override
public void close() throws IOException
{
nextLine = null;
buffer = null;
source.close();
// Note: do not remove the thread local buffer; retain it for reuse later
}
@Override
@ -88,76 +101,188 @@ public class FastLineIterator implements CloseableIterator<String>
return true;
}
readNextLine();
nextLine = readNextLine();
return nextLine != null;
}
@Override
public String next()
public T next()
{
if (!hasNext()) {
throw new NoSuchElementException("no more lines");
}
String result = nextLine;
T result = nextLine;
nextLine = null;
return result;
}
void readNextLine()
private T readNextLine()
{
byte[] load = BUFFER_LOCAL.get();
boolean endOfFile = false;
// load data until finished or found a line feed
int indexOfLf = buffer.indexOf(LF);
while (!endOfFile && indexOfLf < 0) {
int readCount;
try {
readCount = source.read(load);
}
catch (IOException e) {
nextLine = null;
throw new IllegalStateException(e);
}
if (readCount < 0) {
endOfFile = true;
while (true) {
// See if there's another line already available in the buffer.
if (endOfStream) {
return readLineFromBuffer();
} else {
int sizeBefore = buffer.size();
buffer.addElements(buffer.size(), load, 0, readCount);
// Look for next LF with 8-byte stride.
while (position < limit - Long.BYTES) {
final long w = buffer.getLong(position);
final int index = firstOccurrence(w, LF_REPEAT);
if (index < Long.BYTES) {
position = position + index;
return readLineFromBuffer();
} else {
position += Long.BYTES;
}
}
// check if there were any LFs in the newly collected data
for (int i = 0; i < readCount; i++) {
if (load[i] == LF) {
indexOfLf = sizeBefore + i;
break;
// Look for next LF with 1-byte stride.
for (; position < limit; position++) {
if (buffer.get(position) == LF) {
return readLineFromBuffer();
}
}
}
// No line available in the buffer.
// Ensure space exists to read at least one more byte.
final int available = buffer.capacity() - limit;
if (available == 0) {
final int currentLength = limit - lineStart;
if (lineStart == 0) {
// Allocate a larger buffer.
final byte[] newBuf = new byte[buffer.capacity() * 2];
System.arraycopy(buffer.array(), lineStart, newBuf, 0, currentLength);
setBuffer(newBuf);
} else {
// Move current line to the start of the existing buffer.
System.arraycopy(buffer.array(), lineStart, buffer.array(), 0, currentLength);
}
position -= lineStart;
limit -= lineStart;
lineStart = 0;
}
// Read as much as we can.
try {
final int bytesRead = source.read(buffer.array(), limit, buffer.capacity() - limit);
if (bytesRead < 0) {
// End of stream.
endOfStream = true;
return readLineFromBuffer();
}
limit += bytesRead;
}
catch (IOException e) {
throw new IllegalStateException(e);
}
}
}
@Nullable
private T readLineFromBuffer()
{
for (; position < limit && buffer.get(position) != LF; position++) {
// Skip to limit or next LF
}
if (endOfFile && buffer.size() == 0) {
// empty line and end of file
nextLine = null;
final boolean isLf = position < limit && buffer.get(position) == LF;
} else if (indexOfLf < 0) {
// no LF at all; end of input
nextLine = StringUtils.fromUtf8(buffer);
buffer.removeElements(0, buffer.size());
} else if (indexOfLf >= 1 && buffer.getByte(indexOfLf - 1) == CR) {
final int lineEnd;
if (isLf && position > lineStart && buffer.get(position - 1) == CR) {
// CR LF
nextLine = StringUtils.fromUtf8(buffer.elements(), 0, indexOfLf - 1);
buffer.removeElements(0, indexOfLf + 1);
} else {
lineEnd = position - 1;
} else if (isLf) {
// LF only
nextLine = StringUtils.fromUtf8(buffer.elements(), 0, indexOfLf);
buffer.removeElements(0, indexOfLf + 1);
lineEnd = position;
} else if (endOfStream) {
// End of stream
lineEnd = position;
} else {
// There wasn't a line after all
throw DruidException.defensive("No line to read");
}
if (lineStart == limit && endOfStream) {
// null signifies no more lines are coming.
return null;
}
final T retVal = makeObject(buffer.array(), lineStart, lineEnd - lineStart);
if (position < limit) {
position++;
}
lineStart = position;
return retVal;
}
protected abstract T makeObject(byte[] bytes, int offset, int length);
private void setBuffer(final byte[] buffer)
{
this.buffer = ByteBuffer.wrap(buffer).order(ByteOrder.BIG_ENDIAN);
}
/**
* Find the first {@link #LF} byte in a long. Returns 8 if there are no {@link #LF} bytes.
*
* @param n input long
* @param pattern pattern generated by repeating the byte 8 times. Use
* {@link #firstOccurrencePattern(byte)} to compute.
*/
private static int firstOccurrence(long n, long pattern)
{
// Xor with LF_REPEAT to turn LF bytes into zero-bytes.
final long xored = n ^ pattern;
// Apply test from https://graphics.stanford.edu/~seander/bithacks.html#ValueInWord, which zeroes out all
// non-zero bytes, and sets the high bits for bytes that were zero.
final long zeroTest = (((xored - 0x0101010101010101L) & ~(xored) & 0x8080808080808080L));
// Count number of leading zeroes, which will be a multiple of 8, then divide by 8.
return Long.numberOfLeadingZeros(zeroTest) >>> 3;
}
/**
* Generate a search pattern for {@link #firstOccurrence(long, long)}.
*/
private static long firstOccurrencePattern(final byte b)
{
return Longs.fromBytes(b, b, b, b, b, b, b, b);
}
public static class Strings extends FastLineIterator<String>
{
public Strings(final InputStream source)
{
super(source);
}
@Override
protected String makeObject(byte[] bytes, int offset, int length)
{
return StringUtils.fromUtf8(bytes, offset, length);
}
}
public static class Bytes extends FastLineIterator<byte[]>
{
public Bytes(final InputStream source)
{
super(source);
}
@Override
protected byte[] makeObject(byte[] bytes, int offset, int length)
{
final byte[] retVal = new byte[length];
System.arraycopy(bytes, offset, retVal, 0, length);
return retVal;
}
}
}

View File

@ -23,7 +23,15 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.ListBasedInputRow;
import org.apache.druid.data.input.ListBasedInputRowAdapter;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.utils.CompressionUtils;
import javax.annotation.Nullable;
@ -123,6 +131,30 @@ public abstract class FlatTextInputFormat implements InputFormat
return skipHeaderRows;
}
@Override
public long getWeightedSize(String path, long size)
{
CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path);
if (CompressionUtils.Format.GZ == compressionFormat) {
return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
}
return size;
}
@Override
public RowAdapter<InputRow> createRowAdapter(InputRowSchema inputRowSchema)
{
if (useListBasedInputRows()) {
final RowSignature.Builder builder = RowSignature.builder();
for (final String column : columns) {
builder.add(column, null);
}
return new ListBasedInputRowAdapter(builder.build());
} else {
return RowAdapters.standardRow();
}
}
@Override
public boolean equals(Object o)
{
@ -140,31 +172,30 @@ public abstract class FlatTextInputFormat implements InputFormat
Objects.equals(delimiter, that.delimiter);
}
@Override
public long getWeightedSize(String path, long size)
{
CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path);
if (CompressionUtils.Format.GZ == compressionFormat) {
return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
}
return size;
}
@Override
public int hashCode()
{
return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows, delimiter);
}
/**
* Whether this format can use {@link ListBasedInputRow}, which are preferred over {@link MapBasedInputRow} when
* possible. Subclasses pass this to {@link DelimitedValueReader}.
*/
protected boolean useListBasedInputRows()
{
return !columns.isEmpty() && !findColumnsFromHeader;
}
protected String fieldsToString()
{
return "FlatTextInputFormat{"
+ "delimiter=\"" + delimiter
+ "\"listDelimiter="
+ listDelimiter == null ? "null" : "\"" + listDelimiter + "\""
+ ", findColumnsFromHeader=" + findColumnsFromHeader
+ ", skipHeaderRows=" + skipHeaderRows
+ ", columns=" + columns
+ "}";
+ "delimiter=\"" + delimiter
+ "\"listDelimiter="
+ (listDelimiter == null ? "null" : "\"" + listDelimiter + "\"")
+ ", findColumnsFromHeader=" + findColumnsFromHeader
+ ", skipHeaderRows=" + skipHeaderRows
+ ", columns=" + columns
+ "}";
}
}

View File

@ -30,6 +30,7 @@ import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.RowAdapter;
import java.io.File;
import java.io.IOException;
@ -105,6 +106,12 @@ public class InputEntityIteratingReader implements InputSourceReader
});
}
@Override
public RowAdapter<InputRow> rowAdapter()
{
return inputFormat.createRowAdapter(inputRowSchema);
}
private <R> CloseableIterator<R> createIterator(Function<InputEntity, CloseableIterator<R>> rowPopulator)
{
return sourceIterator.flatMap(rowPopulator);

View File

@ -45,7 +45,7 @@ import java.util.Map;
* This also means that each text line should be a well-formed JSON text, pretty-printed format is not allowed
*
*/
public class JsonLineReader extends TextReader
public class JsonLineReader extends TextReader.Bytes
{
private final ObjectFlattener<JsonNode> flattener;
private final ObjectMapper mapper;
@ -70,7 +70,7 @@ public class JsonLineReader extends TextReader
}
@Override
public List<InputRow> parseInputRows(String line) throws IOException, ParseException
public List<InputRow> parseInputRows(byte[] line) throws IOException, ParseException
{
final JsonNode document = mapper.readValue(line, JsonNode.class);
final Map<String, Object> flattened = flattener.flatten(document);
@ -78,7 +78,7 @@ public class JsonLineReader extends TextReader
}
@Override
public List<Map<String, Object>> toMap(String intermediateRow) throws IOException
public List<Map<String, Object>> toMap(byte[] intermediateRow) throws IOException
{
//noinspection unchecked
return Collections.singletonList(mapper.readValue(intermediateRow, Map.class));
@ -97,7 +97,7 @@ public class JsonLineReader extends TextReader
}
@Override
public void processHeaderLine(String line)
public void processHeaderLine(byte[] line)
{
// do nothing
}

View File

@ -37,6 +37,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
public class MapInputRowParser implements InputRowParser<Map<String, Object>>
{
@ -80,8 +81,17 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
theMap == null ? Collections.emptySet() : theMap.keySet()
);
return parse(timestampSpec, dimensionsToUse, theMap);
}
public static InputRow parse(
TimestampSpec timestampSpec,
List<String> dimensions,
Map<String, Object> theMap
) throws ParseException
{
final DateTime timestamp = parseTimestamp(timestampSpec, theMap);
return new MapBasedInputRow(timestamp, dimensionsToUse, theMap);
return new MapBasedInputRow(timestamp, dimensions, theMap);
}
/**
@ -89,11 +99,11 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
* There are 3 cases here.
*
* 1) If {@link DimensionsSpec#isIncludeAllDimensions()} is set, the returned list includes _both_
* {@link DimensionsSpec#getDimensionNames()} and the dimensions in the given map ({@code rawInputRow#keySet()}).
* {@link DimensionsSpec#getDimensionNames()} and the dimensions in the given map ({@code rawInputRow#keySet()}).
* 2) If isIncludeAllDimensions is not set and {@link DimensionsSpec#getDimensionNames()} is not empty,
* the dimensions in dimensionsSpec is returned.
* the dimensions in dimensionsSpec is returned.
* 3) If isIncludeAllDimensions is not set and {@link DimensionsSpec#getDimensionNames()} is empty,
* the dimensions in the given map is returned.
* the dimensions in the given map is returned.
*
* In any case, the returned list does not include any dimensions in {@link DimensionsSpec#getDimensionExclusions()}
* or {@link TimestampSpec#getTimestampColumn()}.
@ -132,32 +142,54 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
}
public static DateTime parseTimestamp(TimestampSpec timestampSpec, Map<String, Object> theMap)
{
return parseTimestampOrThrowParseException(
timestampSpec.getRawTimestamp(theMap),
timestampSpec,
() -> theMap
);
}
/**
* Given a plain Java Object, extract a timestamp from it using the provided {@link TimestampSpec}, or throw
* a {@link ParseException} if we can't.
*
* @param timeValue object to interpret as timestmap
* @param timestampSpec timestamp spec
* @param rawMapSupplier supplier of the original raw data that this object came from. Used for error messages.
*/
public static DateTime parseTimestampOrThrowParseException(
final Object timeValue,
final TimestampSpec timestampSpec,
final Supplier<Map<String, ?>> rawMapSupplier
)
{
final DateTime timestamp;
try {
timestamp = timestampSpec.extractTimestamp(theMap);
timestamp = timestampSpec.parseDateTime(timeValue);
}
catch (Exception e) {
String rawMap = rawMapToPrint(theMap);
String rawMap = rawMapToPrint(rawMapSupplier.get());
throw new ParseException(
rawMap,
e,
"Timestamp[%s] is unparseable! Event: %s",
timestampSpec.getRawTimestamp(theMap),
timeValue,
rawMap
);
}
if (timestamp == null) {
String rawMap = rawMapToPrint(theMap);
String rawMap = rawMapToPrint(rawMapSupplier.get());
throw new ParseException(
rawMap,
"Timestamp[%s] is unparseable! Event: %s",
timestampSpec.getRawTimestamp(theMap),
timeValue,
rawMap
);
}
if (!Intervals.ETERNITY.contains(timestamp)) {
String rawMap = rawMapToPrint(theMap);
String rawMap = rawMapToPrint(rawMapSupplier.get());
throw new ParseException(
rawMap,
"Encountered row with timestamp[%s] that cannot be represented as a long: [%s]",
@ -169,7 +201,7 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
}
@Nullable
private static String rawMapToPrint(@Nullable Map<String, Object> rawMap)
private static String rawMapToPrint(@Nullable Map<String, ?> rawMap)
{
if (rawMap == null) {
return null;

View File

@ -39,7 +39,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class RegexReader extends TextReader
public class RegexReader extends TextReader.Strings
{
private final String pattern;
private final Pattern compiledPattern;

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.RowAdapter;
import org.joda.time.DateTime;
import java.io.IOException;
@ -135,4 +136,10 @@ public class TimedShutoffInputSourceReader implements InputSourceReader
return wrappingIterator;
}
@Override
public RowAdapter<InputRow> rowAdapter()
{
return delegate.rowAdapter();
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.java.util.common;
import com.google.common.base.Strings;
import it.unimi.dsi.fastutil.bytes.ByteArrayList;
import org.apache.commons.io.IOUtils;
import javax.annotation.Nonnull;
@ -261,11 +260,6 @@ public class StringUtils
return StringUtils.fromUtf8(buffer, buffer.remaining());
}
public static String fromUtf8(final ByteArrayList buffer)
{
return StringUtils.fromUtf8(buffer.elements(), 0, buffer.size());
}
/**
* If buffer is Decodes a UTF-8 string from the remaining bytes of a buffer.
* Advances the position of the buffer by {@link ByteBuffer#remaining()}.

View File

@ -100,7 +100,7 @@ public abstract class AbstractFlatTextFormatParser implements Parser<String, Obj
public void setFieldNames(final Iterable<String> fieldNames)
{
if (fieldNames != null) {
this.fieldNames = TextReader.findOrCreateColumnNames(Lists.newArrayList(fieldNames));
this.fieldNames = TextReader.findOrCreateInputRowSignature(Lists.newArrayList(fieldNames)).getColumnNames();
}
}

View File

@ -32,6 +32,7 @@ import com.google.common.primitives.Longs;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.ListBasedInputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionSchema;
@ -684,11 +685,13 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
return ((MapBasedInputRow) inputRow).getEvent().toString();
}
if (inputRow instanceof ListBasedInputRow) {
return ((ListBasedInputRow) inputRow).asMap().toString();
}
if (inputRow instanceof TransformedInputRow) {
InputRow innerRow = ((TransformedInputRow) inputRow).getBaseRow();
if (innerRow instanceof MapBasedInputRow) {
return ((MapBasedInputRow) innerRow).getEvent().toString();
}
return getSimplifiedEventStringFromRow(innerRow);
}
return inputRow.toString();

View File

@ -20,17 +20,17 @@
package org.apache.druid.data.input.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.ListBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.column.RowSignature;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@ -163,59 +163,63 @@ public class CsvReaderTest
"65,Here I write \\n slash n,2018-05-09T10:00:00Z"
)
);
final RowSignature signature =
RowSignature.builder()
.add("Value", null)
.add("Comment", null)
.add("Timestamp", null)
.build();
final List<InputRow> expectedResults = ImmutableList.of(
new MapBasedInputRow(
new ListBasedInputRow(
signature,
DateTimes.of("2018-05-05T10:00:00Z"),
ImmutableList.of("Timestamp"),
ImmutableMap.of(
"Value",
ImmutableList.of(
"3",
"Comment",
"Lets do some \"normal\" quotes",
"Timestamp",
"2018-05-05T10:00:00Z"
)
),
new MapBasedInputRow(
new ListBasedInputRow(
signature,
DateTimes.of("2018-05-06T10:00:00Z"),
ImmutableList.of("Timestamp"),
ImmutableMap.of(
"Value",
ImmutableList.of(
"34",
"Comment",
"Lets do some \"normal\", quotes with comma",
"Timestamp",
"2018-05-06T10:00:00Z"
)
),
new MapBasedInputRow(
new ListBasedInputRow(
signature,
DateTimes.of("2018-05-07T10:00:00Z"),
ImmutableList.of("Timestamp"),
ImmutableMap.of(
"Value",
ImmutableList.of(
"343",
"Comment",
"Lets try \\\"it\\\" with slash quotes",
"Timestamp",
"2018-05-07T10:00:00Z"
)
),
new MapBasedInputRow(
new ListBasedInputRow(
signature,
DateTimes.of("2018-05-08T10:00:00Z"),
ImmutableList.of("Timestamp"),
ImmutableMap.of(
"Value",
ImmutableList.of(
"545",
"Comment",
"Lets try \\\"it\\\", with slash quotes and comma",
"Timestamp",
"2018-05-08T10:00:00Z"
)
),
new MapBasedInputRow(
new ListBasedInputRow(
signature,
DateTimes.of("2018-05-09T10:00:00Z"),
ImmutableList.of("Timestamp"),
ImmutableMap.of("Value", "65", "Comment", "Here I write \\n slash n", "Timestamp", "2018-05-09T10:00:00Z")
ImmutableList.of(
"65",
"Here I write \\n slash n",
"2018-05-09T10:00:00Z"
)
)
);
final CsvInputFormat format = new CsvInputFormat(

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
public class DelimitedBytesTest extends InitializedNullHandlingTest
{
private static final byte TSV = (byte) '\t';
@Test
public void testEmpty()
{
Assert.assertEquals(
Collections.singletonList(NullHandling.sqlCompatible() ? null : ""),
DelimitedBytes.split(new byte[0], TSV, DelimitedBytes.UNKNOWN_FIELD_COUNT)
);
}
@Test
public void testNoDelimiter()
{
Assert.assertEquals(
Collections.singletonList("abc"),
DelimitedBytes.split(StringUtils.toUtf8("abc"), TSV, DelimitedBytes.UNKNOWN_FIELD_COUNT)
);
}
@Test
public void testOneDelimiter()
{
Assert.assertEquals(
Arrays.asList("a", "bc"),
DelimitedBytes.split(StringUtils.toUtf8("a\tbc"), TSV, DelimitedBytes.UNKNOWN_FIELD_COUNT)
);
}
@Test
public void testDelimiterAtStart()
{
Assert.assertEquals(
Arrays.asList(NullHandling.sqlCompatible() ? null : "", "abc"),
DelimitedBytes.split(StringUtils.toUtf8("\tabc"), TSV, DelimitedBytes.UNKNOWN_FIELD_COUNT)
);
}
@Test
public void testDelimiterAtEnd()
{
Assert.assertEquals(
Arrays.asList("a", "bc", NullHandling.sqlCompatible() ? null : ""),
DelimitedBytes.split(StringUtils.toUtf8("a\tbc\t"), TSV, DelimitedBytes.UNKNOWN_FIELD_COUNT)
);
}
@Test
public void testMoreFieldsThanHint()
{
Assert.assertEquals(
Arrays.asList("a", "b", "c"),
DelimitedBytes.split(StringUtils.toUtf8("a\tb\tc"), TSV, 1)
);
}
}

View File

@ -47,14 +47,14 @@ public class FastLineIteratorTest
{
expectedException.expect(NullPointerException.class);
//noinspection ResultOfObjectAllocationIgnored
new FastLineIterator(null);
new FastLineIterator.Strings(null);
}
@Test
public void testEmptyInput()
{
byte[] input = new byte[0];
FastLineIterator iterator = new FastLineIterator(new ByteArrayInputStream(input));
FastLineIterator<String> iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertFalse(iterator.hasNext());
@ -66,12 +66,12 @@ public class FastLineIteratorTest
public void testSoloCr()
{
byte[] input;
FastLineIterator iterator;
FastLineIterator<String> iterator;
// a single \r
// it is expected that this emits a complete line with \r since a return on its own is not a line break
input = "\r".getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals("\r", iterator.next());
@ -82,12 +82,12 @@ public class FastLineIteratorTest
public void testSoloLf()
{
byte[] input;
FastLineIterator iterator;
FastLineIterator<String> iterator;
// a single \n
// should emit a single complete 'line' as "", and no trailing line (since EOF)
input = "\n".getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals("", iterator.next());
@ -98,13 +98,13 @@ public class FastLineIteratorTest
public void testBackwardsLfCr()
{
byte[] input;
FastLineIterator iterator;
FastLineIterator<String> iterator;
// should emit two lines:
// first one is an empty line for before the \n,
// second is the \r alone
input = "\n\r".getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals("", iterator.next());
@ -117,11 +117,11 @@ public class FastLineIteratorTest
public void testForwardsSoloCrLf()
{
byte[] input;
FastLineIterator iterator;
FastLineIterator<String> iterator;
// should emit one (empty) line
input = "\r\n".getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals("", iterator.next());
@ -132,11 +132,11 @@ public class FastLineIteratorTest
public void testSingleLine()
{
byte[] input;
FastLineIterator iterator;
FastLineIterator<String> iterator;
// without an end
input = "abcd".getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals("abcd", iterator.next());
@ -144,7 +144,7 @@ public class FastLineIteratorTest
// with an end
input = "abcd\n".getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals("abcd", iterator.next());
@ -152,7 +152,7 @@ public class FastLineIteratorTest
// with an end
input = "abcd\r\n".getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals("abcd", iterator.next());
@ -163,10 +163,10 @@ public class FastLineIteratorTest
public void testMultipleLines()
{
byte[] input;
FastLineIterator iterator;
FastLineIterator<String> iterator;
input = "abcd\ndefg\nhijk".getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals("abcd", iterator.next());
@ -179,10 +179,10 @@ public class FastLineIteratorTest
public void testEmptyMiddleLine()
{
byte[] input;
FastLineIterator iterator;
FastLineIterator<String> iterator;
input = "abcd\n\nhijk\n".getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals("abcd", iterator.next());
@ -195,10 +195,10 @@ public class FastLineIteratorTest
public void testEmptyLastLine()
{
byte[] input;
FastLineIterator iterator;
FastLineIterator<String> iterator;
input = "abcd\ndefg\nhijk\n".getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals("abcd", iterator.next());
@ -211,14 +211,14 @@ public class FastLineIteratorTest
public void testOverlappingBuffer()
{
byte[] input;
FastLineIterator iterator;
FastLineIterator<String> iterator;
String line1 = RandomStringUtils.random(FastLineIterator.BUFFER_SIZE - 20);
String line2 = RandomStringUtils.random(40);
String line3 = RandomStringUtils.random(20);
String line1 = randomString(FastLineIterator.BUFFER_SIZE - 20);
String line2 = randomString(40);
String line3 = randomString(20);
input = (line1 + "\n" + line2 + "\n" + line3 + "\n").getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(line1, iterator.next());
@ -231,15 +231,15 @@ public class FastLineIteratorTest
public void testLineLargerThanBufferSize()
{
byte[] input;
FastLineIterator iterator;
FastLineIterator<String> iterator;
// random lengths that force multiple buffer trips
String line1 = RandomStringUtils.random(FastLineIterator.BUFFER_SIZE * 3 + 10);
String line2 = RandomStringUtils.random(FastLineIterator.BUFFER_SIZE * 2 + 15);
String line3 = RandomStringUtils.random(FastLineIterator.BUFFER_SIZE + 9);
String line1 = randomString(FastLineIterator.BUFFER_SIZE * 3 + 10);
String line2 = randomString(FastLineIterator.BUFFER_SIZE * 2 + 15);
String line3 = randomString(FastLineIterator.BUFFER_SIZE + 9);
input = (line1 + "\r\n" + line2 + "\r\n" + line3 + "\r\n").getBytes(StandardCharsets.UTF_8);
iterator = new FastLineIterator(new ByteArrayInputStream(input));
iterator = new FastLineIterator.Strings(new ByteArrayInputStream(input));
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(line1, iterator.next());
@ -247,4 +247,14 @@ public class FastLineIteratorTest
Assert.assertEquals(line3, iterator.next());
Assert.assertFalse(iterator.hasNext());
}
/**
* Random string that does not contain \r or \n.
*/
private static String randomString(final int length)
{
return RandomStringUtils.random(length)
.replace('\r', '?')
.replace('\n', '?');
}
}