mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Make ParseExceptions more informative (#12259)
This PR aims to make the ParseExceptions in Druid more informative, by adding additional information (metadata) to the ParseException, which can contain additional information about the exception. For example - the path of the file generating the issue, the line number (where it can be easily fetched - like CsvReader) Following changes are addressed in this PR: A new class CloseableIteratorWithMetadata has been created which is like CloseableIterator but also has a metadata method that returns a context Map<String, Object> about the current element returned by next(). IntermediateRowParsingReader#read() now attaches the InputEntity and the "record number" which created the exception (while parsing them), and IntermediateRowParsingReader#sample attaches the InputEntity (but not the "record number"). TextReader (and its subclasses), which is a specific implementation of the IntermediateRowParsingReader also include the line number which caused the generation of the error. This will also help in triaging the issues when InputSourceReader generates ParseException because it can point to the specific InputEntity which caused the exception (while trying to read it).
This commit is contained in:
parent
d105519558
commit
3f709db173
@ -19,10 +19,15 @@
|
||||
|
||||
package org.apache.druid.data.input;
|
||||
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.UOE;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata;
|
||||
import org.apache.druid.java.util.common.parsers.ParseException;
|
||||
import org.apache.druid.utils.CollectionUtils;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -41,7 +46,7 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
|
||||
@Override
|
||||
public CloseableIterator<InputRow> read() throws IOException
|
||||
{
|
||||
final CloseableIterator<T> intermediateRowIterator = intermediateRowIterator();
|
||||
final CloseableIteratorWithMetadata<T> intermediateRowIteratorWithMetadata = intermediateRowIteratorWithMetadata();
|
||||
|
||||
return new CloseableIterator<InputRow>()
|
||||
{
|
||||
@ -52,28 +57,42 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
|
||||
// good idea. Subclasses could implement read() with some duplicate codes to avoid unnecessary iteration on
|
||||
// a singleton list.
|
||||
Iterator<InputRow> rows = null;
|
||||
long currentRecordNumber = 1;
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
if (rows == null || !rows.hasNext()) {
|
||||
if (!intermediateRowIterator.hasNext()) {
|
||||
if (!intermediateRowIteratorWithMetadata.hasNext()) {
|
||||
return false;
|
||||
}
|
||||
final T row = intermediateRowIterator.next();
|
||||
final T row = intermediateRowIteratorWithMetadata.next();
|
||||
try {
|
||||
rows = parseInputRows(row).iterator();
|
||||
++currentRecordNumber;
|
||||
}
|
||||
catch (IOException e) {
|
||||
final Map<String, Object> metadata = intermediateRowIteratorWithMetadata.currentMetadata();
|
||||
rows = new ExceptionThrowingIterator(new ParseException(
|
||||
String.valueOf(row),
|
||||
e,
|
||||
"Unable to parse row [%s]",
|
||||
row
|
||||
buildParseExceptionMessage(
|
||||
StringUtils.format("Unable to parse row [%s]", row),
|
||||
source(),
|
||||
currentRecordNumber,
|
||||
metadata
|
||||
)
|
||||
));
|
||||
}
|
||||
catch (ParseException e) {
|
||||
rows = new ExceptionThrowingIterator(e);
|
||||
final Map<String, Object> metadata = intermediateRowIteratorWithMetadata.currentMetadata();
|
||||
// Replace the message of the ParseException e
|
||||
rows = new ExceptionThrowingIterator(
|
||||
new ParseException(
|
||||
e.getInput(),
|
||||
e.isFromPartiallyValidRow(),
|
||||
buildParseExceptionMessage(e.getMessage(), source(), currentRecordNumber, metadata)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@ -93,7 +112,7 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
intermediateRowIterator.close();
|
||||
intermediateRowIteratorWithMetadata.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -101,43 +120,128 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
|
||||
@Override
|
||||
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
|
||||
{
|
||||
return intermediateRowIterator().map(row -> {
|
||||
|
||||
final List<Map<String, Object>> rawColumnsList;
|
||||
try {
|
||||
rawColumnsList = toMap(row);
|
||||
}
|
||||
catch (Exception e) {
|
||||
return InputRowListPlusRawValues.of(null,
|
||||
new ParseException(String.valueOf(row), e, "Unable to parse row [%s] into JSON", row));
|
||||
final CloseableIteratorWithMetadata<T> delegate = intermediateRowIteratorWithMetadata();
|
||||
|
||||
return new CloseableIterator<InputRowListPlusRawValues>()
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
|
||||
return InputRowListPlusRawValues.of(null,
|
||||
new ParseException(String.valueOf(row), "No map object parsed for row [%s]", row));
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return delegate.hasNext();
|
||||
}
|
||||
|
||||
List<InputRow> rows;
|
||||
try {
|
||||
rows = parseInputRows(row);
|
||||
}
|
||||
catch (ParseException e) {
|
||||
return InputRowListPlusRawValues.ofList(rawColumnsList, e);
|
||||
}
|
||||
catch (IOException e) {
|
||||
ParseException exception = new ParseException(String.valueOf(row), e, "Unable to parse row [%s] into inputRow", row);
|
||||
return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
|
||||
}
|
||||
@Override
|
||||
public InputRowListPlusRawValues next()
|
||||
{
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
|
||||
});
|
||||
return sampleIntermediateRow(delegate.next(), delegate.currentMetadata());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses and samples the intermediate row and returns input row and the raw values in it. Metadata supplied can
|
||||
* contain information about the source which will get surfaced in case an exception occurs while parsing the
|
||||
* intermediate row
|
||||
*
|
||||
* @param row intermediate row
|
||||
* @param metadata additional information about the source and the record getting parsed
|
||||
* @return sampled data from the intermediate row
|
||||
*/
|
||||
private InputRowListPlusRawValues sampleIntermediateRow(T row, Map<String, Object> metadata)
|
||||
{
|
||||
|
||||
final List<Map<String, Object>> rawColumnsList;
|
||||
try {
|
||||
rawColumnsList = toMap(row);
|
||||
}
|
||||
catch (Exception e) {
|
||||
return InputRowListPlusRawValues.of(
|
||||
null,
|
||||
new ParseException(String.valueOf(row), e, buildParseExceptionMessage(
|
||||
StringUtils.nonStrictFormat("Unable to parse row [%s] into JSON", row),
|
||||
source(),
|
||||
null,
|
||||
metadata
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
|
||||
return InputRowListPlusRawValues.of(
|
||||
null,
|
||||
new ParseException(String.valueOf(row), buildParseExceptionMessage(
|
||||
StringUtils.nonStrictFormat("No map object parsed for row [%s]", row),
|
||||
source(),
|
||||
null,
|
||||
metadata
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
List<InputRow> rows;
|
||||
try {
|
||||
rows = parseInputRows(row);
|
||||
}
|
||||
catch (ParseException e) {
|
||||
return InputRowListPlusRawValues.ofList(rawColumnsList, new ParseException(
|
||||
String.valueOf(row),
|
||||
e,
|
||||
buildParseExceptionMessage(e.getMessage(), source(), null, metadata)
|
||||
));
|
||||
}
|
||||
catch (IOException e) {
|
||||
ParseException exception = new ParseException(String.valueOf(row), e, buildParseExceptionMessage(
|
||||
StringUtils.nonStrictFormat("Unable to parse row [%s] into inputRow", row),
|
||||
source(),
|
||||
null,
|
||||
metadata
|
||||
));
|
||||
return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
|
||||
}
|
||||
|
||||
return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an iterator of intermediate rows. The returned rows will be consumed by {@link #parseInputRows} and
|
||||
* {@link #toMap}.
|
||||
* {@link #toMap}. Either this or {@link #intermediateRowIteratorWithMetadata()} should be implemented
|
||||
*/
|
||||
protected abstract CloseableIterator<T> intermediateRowIterator() throws IOException;
|
||||
protected CloseableIterator<T> intermediateRowIterator() throws IOException
|
||||
{
|
||||
throw new UOE("intermediateRowIterator not implemented");
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as {@code intermediateRowIterator}, but it also contains the metadata such as the line number to generate
|
||||
* more informative {@link ParseException}.
|
||||
*/
|
||||
protected CloseableIteratorWithMetadata<T> intermediateRowIteratorWithMetadata() throws IOException
|
||||
{
|
||||
return CloseableIteratorWithMetadata.withEmptyMetadata(intermediateRowIterator());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return InputEntity which the implementation is reading from. Useful in generating informative {@link ParseException}s.
|
||||
* For example, in case of {@link org.apache.druid.data.input.impl.FileEntity}, file name containing erroneous records
|
||||
* or in case of {@link org.apache.druid.data.input.impl.HttpEntity}, the endpoint containing the erroneous data can
|
||||
* be attached to the error message
|
||||
*/
|
||||
@Nullable
|
||||
protected InputEntity source()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the given intermediate row into a list of {@link InputRow}s.
|
||||
@ -155,6 +259,37 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
|
||||
*/
|
||||
protected abstract List<Map<String, Object>> toMap(T intermediateRow) throws IOException;
|
||||
|
||||
/**
|
||||
* A helper method which enriches the base parse exception message with additional information. The returned message
|
||||
* has a format: "baseExceptionMessage (key1: value1, key2: value2)" if additional properties are present. Else it
|
||||
* returns the baseException message without any modification
|
||||
*/
|
||||
private static String buildParseExceptionMessage(
|
||||
@Nonnull String baseExceptionMessage,
|
||||
@Nullable InputEntity source,
|
||||
@Nullable Long recordNumber,
|
||||
@Nullable Map<String, Object> metadata
|
||||
)
|
||||
{
|
||||
StringBuilder sb = new StringBuilder();
|
||||
if (source != null && source.getUri() != null) {
|
||||
sb.append(StringUtils.format("Path: %s, ", source.getUri()));
|
||||
}
|
||||
if (recordNumber != null) {
|
||||
sb.append(StringUtils.format("Record: %d, ", recordNumber));
|
||||
}
|
||||
if (metadata != null) {
|
||||
metadata.entrySet().stream()
|
||||
.map(entry -> StringUtils.format("%s: %s, ", entry.getKey(), entry.getValue().toString()))
|
||||
.forEach(sb::append);
|
||||
}
|
||||
if (sb.length() == 0) {
|
||||
return baseExceptionMessage;
|
||||
}
|
||||
sb.setLength(sb.length() - 2); // Erase the last stray ", "
|
||||
return baseExceptionMessage + " (" + sb + ")"; // Wrap extra information in a bracket before returning
|
||||
}
|
||||
|
||||
private static class ExceptionThrowingIterator implements CloseableIterator<InputRow>
|
||||
{
|
||||
private final Exception exception;
|
||||
|
@ -22,14 +22,16 @@ package org.apache.druid.data.input;
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.commons.io.LineIterator;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata;
|
||||
import org.apache.druid.java.util.common.parsers.ParseException;
|
||||
import org.apache.druid.java.util.common.parsers.ParserUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Abstract {@link InputEntityReader} for text format readers such as CSV or JSON.
|
||||
@ -51,8 +53,7 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseableIterator<String> intermediateRowIterator()
|
||||
throws IOException
|
||||
public CloseableIteratorWithMetadata<String> intermediateRowIteratorWithMetadata() throws IOException
|
||||
{
|
||||
final LineIterator delegate = new LineIterator(
|
||||
new InputStreamReader(source.open(), StringUtils.UTF8_STRING)
|
||||
@ -65,8 +66,17 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
|
||||
processHeaderLine(delegate.nextLine());
|
||||
}
|
||||
|
||||
return new CloseableIterator<String>()
|
||||
return new CloseableIteratorWithMetadata<String>()
|
||||
{
|
||||
private static final String LINE_KEY = "Line";
|
||||
private long currentLineNumber = numHeaderLines + (needsToProcessHeaderLine() ? 1 : 0);
|
||||
|
||||
@Override
|
||||
public Map<String, Object> currentMetadata()
|
||||
{
|
||||
return Collections.singletonMap(LINE_KEY, currentLineNumber);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
@ -76,6 +86,7 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
|
||||
@Override
|
||||
public String next()
|
||||
{
|
||||
currentLineNumber++;
|
||||
return delegate.nextLine();
|
||||
}
|
||||
|
||||
@ -87,6 +98,12 @@ public abstract class TextReader extends IntermediateRowParsingReader<String>
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputEntity source()
|
||||
{
|
||||
return source;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the given line into a list of {@link InputRow}s. Note that some file formats can explode a single line of
|
||||
* input into multiple inputRows.
|
||||
|
@ -90,6 +90,12 @@ public class JsonReader extends IntermediateRowParsingReader<String>
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputEntity source()
|
||||
{
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<InputRow> parseInputRows(String intermediateRow) throws IOException, ParseException
|
||||
{
|
||||
|
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.java.util.common.parsers;
|
||||
|
||||
import org.apache.druid.data.input.IntermediateRowParsingReader;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Like {@link CloseableIterator}, but has a currentMetadata() method, which returns "metadata", which is effectively a Map<String, Object>
|
||||
* about the source of last value returned by next()
|
||||
*
|
||||
* The returned metadata is read-only and cannot be modified.
|
||||
*
|
||||
* This metadata can be used as additional information to pin-point the root cause of a parse exception.
|
||||
* So it can include information that helps with such exercise. For example, for a {@link org.apache.druid.data.input.TextReader}
|
||||
* that information can be the line number. Only per row context needs to be passed here so for kafka it could be an offset.
|
||||
* The source information is already available via {@link IntermediateRowParsingReader#source()} method and needn't be included
|
||||
*/
|
||||
public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
|
||||
{
|
||||
|
||||
/**
|
||||
* @return A map containing the information about the source of the last value returned by {@link #next()}
|
||||
*/
|
||||
Map<String, Object> currentMetadata();
|
||||
|
||||
/**
|
||||
* Creates an instance of CloseableIteratorWithMetadata from a {@link CloseableIterator}. {@link #currentMetadata()}
|
||||
* for the instance is guaranteed to return an empty map
|
||||
*/
|
||||
static <T> CloseableIteratorWithMetadata<T> withEmptyMetadata(CloseableIterator<T> delegate)
|
||||
{
|
||||
return new CloseableIteratorWithMetadata<T>()
|
||||
{
|
||||
|
||||
@Override
|
||||
public Map<String, Object> currentMetadata()
|
||||
{
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return delegate.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public T next()
|
||||
{
|
||||
return delegate.next();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -111,6 +111,12 @@ public class AvroOCFReader extends IntermediateRowParsingReader<GenericRecord>
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputEntity source()
|
||||
{
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<InputRow> parseInputRows(GenericRecord intermediateRow) throws ParseException
|
||||
{
|
||||
|
@ -71,6 +71,13 @@ public class AvroStreamReader extends IntermediateRowParsingReader<GenericRecord
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputEntity source()
|
||||
{
|
||||
return source;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected List<InputRow> parseInputRows(GenericRecord intermediateRow) throws ParseException
|
||||
{
|
||||
|
@ -1579,9 +1579,9 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
|
||||
"Unable to parse value[notanumber] for field[met1]",
|
||||
"could not convert value [notanumber] to float",
|
||||
"could not convert value [notanumber] to long",
|
||||
"Unable to parse [] as the intermediateRow resulted in empty input row",
|
||||
"Unable to parse row [unparseable]",
|
||||
"Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
|
||||
"Unable to parse [] as the intermediateRow resulted in empty input row (Record: 1)",
|
||||
"Unable to parse row [unparseable] (Record: 1)",
|
||||
"Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}] (Record: 1)"
|
||||
);
|
||||
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
|
||||
return ((List<String>) r.get("details")).get(0);
|
||||
@ -1665,8 +1665,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
|
||||
.get(RowIngestionMeters.BUILD_SEGMENTS);
|
||||
|
||||
List<String> expectedMessages = Arrays.asList(
|
||||
"Unable to parse [] as the intermediateRow resulted in empty input row",
|
||||
"Unable to parse row [unparseable]"
|
||||
"Unable to parse [] as the intermediateRow resulted in empty input row (Record: 1)",
|
||||
"Unable to parse row [unparseable] (Record: 1)"
|
||||
);
|
||||
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
|
||||
return ((List<String>) r.get("details")).get(0);
|
||||
|
@ -1351,10 +1351,10 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
|
||||
"Unable to parse value[notanumber] for field[met1]",
|
||||
"could not convert value [notanumber] to float",
|
||||
"could not convert value [notanumber] to long",
|
||||
"Timestamp[null] is unparseable! Event: {}",
|
||||
"Unable to parse [] as the intermediateRow resulted in empty input row",
|
||||
"Unable to parse row [unparseable]",
|
||||
"Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
|
||||
"Timestamp[null] is unparseable! Event: {} (Record: 1)",
|
||||
"Unable to parse [] as the intermediateRow resulted in empty input row (Record: 1)",
|
||||
"Unable to parse row [unparseable] (Record: 1)",
|
||||
"Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}] (Record: 1)"
|
||||
);
|
||||
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
|
||||
return ((List<String>) r.get("details")).get(0);
|
||||
@ -1457,8 +1457,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
|
||||
.get(RowIngestionMeters.BUILD_SEGMENTS);
|
||||
|
||||
List<String> expectedMessages = Arrays.asList(
|
||||
"Unable to parse [] as the intermediateRow resulted in empty input row",
|
||||
"Unable to parse row [unparseable]"
|
||||
"Unable to parse [] as the intermediateRow resulted in empty input row (Record: 1)",
|
||||
"Unable to parse row [unparseable] (Record: 1)"
|
||||
);
|
||||
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
|
||||
return ((List<String>) r.get("details")).get(0);
|
||||
|
@ -143,6 +143,12 @@ public class OrcReader extends IntermediateRowParsingReader<OrcStruct>
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputEntity source()
|
||||
{
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<InputRow> parseInputRows(OrcStruct intermediateRow) throws ParseException
|
||||
{
|
||||
|
@ -132,6 +132,12 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputEntity source()
|
||||
{
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<InputRow> parseInputRows(Group intermediateRow) throws ParseException
|
||||
{
|
||||
|
@ -84,6 +84,12 @@ public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputEntity source()
|
||||
{
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<InputRow> parseInputRows(DynamicMessage intermediateRow) throws ParseException, JsonProcessingException
|
||||
{
|
||||
|
@ -141,6 +141,12 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
|
||||
return makeCloseableIteratorFromSequenceAndSegmentFile(sequence, segmentFile);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputEntity source()
|
||||
{
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow) throws ParseException
|
||||
{
|
||||
|
@ -1323,6 +1323,7 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
// report parse exception
|
||||
final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true);
|
||||
final IndexIngestionSpec indexIngestionSpec;
|
||||
List<String> expectedMessages;
|
||||
if (useInputFormatApi) {
|
||||
indexIngestionSpec = createIngestionSpec(
|
||||
jsonMapper,
|
||||
@ -1336,6 +1337,12 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
false,
|
||||
false
|
||||
);
|
||||
expectedMessages = ImmutableList.of(
|
||||
StringUtils.format(
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, d=a, val=1} (Path: %s, Record: 1, Line: 2)",
|
||||
tmpFile.toURI()
|
||||
)
|
||||
);
|
||||
} else {
|
||||
indexIngestionSpec = createIngestionSpec(
|
||||
jsonMapper,
|
||||
@ -1347,6 +1354,9 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
false,
|
||||
false
|
||||
);
|
||||
expectedMessages = ImmutableList.of(
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, d=a, val=1}"
|
||||
);
|
||||
}
|
||||
|
||||
IndexTask indexTask = new IndexTask(
|
||||
@ -1366,9 +1376,6 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
.getUnparseableEvents()
|
||||
.get(RowIngestionMeters.BUILD_SEGMENTS);
|
||||
|
||||
List<String> expectedMessages = ImmutableList.of(
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, d=a, val=1}"
|
||||
);
|
||||
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
|
||||
return ((List<String>) r.get("details")).get(0);
|
||||
}).collect(Collectors.toList());
|
||||
@ -1500,39 +1507,42 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
);
|
||||
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
|
||||
|
||||
Map<String, Object> expectedUnparseables = ImmutableMap.of(
|
||||
RowIngestionMeters.DETERMINE_PARTITIONS,
|
||||
Arrays.asList(
|
||||
"Unable to parse row [this is not JSON]",
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
|
||||
),
|
||||
RowIngestionMeters.BUILD_SEGMENTS,
|
||||
Arrays.asList(
|
||||
"Unable to parse row [this is not JSON]",
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
|
||||
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=4.0, val=notnumber}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [Unable to parse value[notnumber] for field[val]]",
|
||||
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=notnumber, val=1}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to float]",
|
||||
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=notnumber, dimFloat=3.0, val=1}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long]",
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
|
||||
)
|
||||
);
|
||||
|
||||
List<LinkedHashMap> parseExceptionReports = (List<LinkedHashMap>) reportData
|
||||
.getUnparseableEvents()
|
||||
.get(RowIngestionMeters.BUILD_SEGMENTS);
|
||||
|
||||
List<String> expectedMessages = Arrays.asList(
|
||||
"Unable to parse row [this is not JSON]",
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
|
||||
"Unable to parse value[notnumber] for field[val]",
|
||||
"could not convert value [notnumber] to float",
|
||||
"could not convert value [notnumber] to long",
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
|
||||
);
|
||||
List<String> expectedMessages;
|
||||
if (useInputFormatApi) {
|
||||
expectedMessages = Arrays.asList(
|
||||
StringUtils.format("Unable to parse row [this is not JSON] (Path: %s, Record: 6, Line: 9)", tmpFile.toURI()),
|
||||
StringUtils.format(
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 6, Line: 8)",
|
||||
tmpFile.toURI()
|
||||
),
|
||||
StringUtils.format(
|
||||
"Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}] (Path: %s, Record: 5, Line: 6)",
|
||||
tmpFile.toURI()
|
||||
),
|
||||
"Unable to parse value[notnumber] for field[val]",
|
||||
"could not convert value [notnumber] to float",
|
||||
"could not convert value [notnumber] to long",
|
||||
StringUtils.format(
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1, Line: 1)",
|
||||
tmpFile.toURI()
|
||||
)
|
||||
);
|
||||
} else {
|
||||
expectedMessages = Arrays.asList(
|
||||
"Unable to parse row [this is not JSON]",
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
|
||||
"Unable to parse value[notnumber] for field[val]",
|
||||
"could not convert value [notnumber] to float",
|
||||
"could not convert value [notnumber] to long",
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
|
||||
);
|
||||
}
|
||||
|
||||
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
|
||||
return ((List<String>) r.get("details")).get(0);
|
||||
}).collect(Collectors.toList());
|
||||
@ -1556,12 +1566,31 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
.getUnparseableEvents()
|
||||
.get(RowIngestionMeters.DETERMINE_PARTITIONS);
|
||||
|
||||
expectedMessages = Arrays.asList(
|
||||
"Unable to parse row [this is not JSON]",
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
|
||||
);
|
||||
if (useInputFormatApi) {
|
||||
expectedMessages = Arrays.asList(
|
||||
StringUtils.format("Unable to parse row [this is not JSON] (Path: %s, Record: 6, Line: 9)", tmpFile.toURI()),
|
||||
StringUtils.format(
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 6, Line: 8)",
|
||||
tmpFile.toURI()
|
||||
),
|
||||
StringUtils.format(
|
||||
"Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}] (Path: %s, Record: 5, Line: 6)",
|
||||
tmpFile.toURI()
|
||||
),
|
||||
StringUtils.format(
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1, Line: 1)",
|
||||
tmpFile.toURI()
|
||||
)
|
||||
);
|
||||
} else {
|
||||
expectedMessages = Arrays.asList(
|
||||
"Unable to parse row [this is not JSON]",
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
|
||||
);
|
||||
}
|
||||
|
||||
actualMessages = parseExceptionReports.stream().map((r) -> {
|
||||
return ((List<String>) r.get("details")).get(0);
|
||||
}).collect(Collectors.toList());
|
||||
@ -1634,6 +1663,8 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
);
|
||||
final List<String> columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val");
|
||||
final IndexIngestionSpec ingestionSpec;
|
||||
|
||||
List<String> expectedMessages;
|
||||
if (useInputFormatApi) {
|
||||
ingestionSpec = createIngestionSpec(
|
||||
jsonMapper,
|
||||
@ -1647,6 +1678,20 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
false,
|
||||
false
|
||||
);
|
||||
expectedMessages = Arrays.asList(
|
||||
StringUtils.format(
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 3, Line: 6)",
|
||||
tmpFile.toURI()
|
||||
),
|
||||
StringUtils.format(
|
||||
"Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 2, Line: 4)",
|
||||
tmpFile.toURI()
|
||||
),
|
||||
StringUtils.format(
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1, Line: 2)",
|
||||
tmpFile.toURI()
|
||||
)
|
||||
);
|
||||
} else {
|
||||
ingestionSpec = createIngestionSpec(
|
||||
jsonMapper,
|
||||
@ -1658,6 +1703,11 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
false,
|
||||
false
|
||||
);
|
||||
expectedMessages = Arrays.asList(
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
|
||||
);
|
||||
}
|
||||
|
||||
IndexTask indexTask = new IndexTask(
|
||||
@ -1696,11 +1746,6 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
.getUnparseableEvents()
|
||||
.get(RowIngestionMeters.BUILD_SEGMENTS);
|
||||
|
||||
List<String> expectedMessages = Arrays.asList(
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
|
||||
);
|
||||
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
|
||||
return ((List<String>) r.get("details")).get(0);
|
||||
}).collect(Collectors.toList());
|
||||
@ -1771,6 +1816,8 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
);
|
||||
final List<String> columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val");
|
||||
final IndexIngestionSpec ingestionSpec;
|
||||
|
||||
List<String> expectedMessages;
|
||||
if (useInputFormatApi) {
|
||||
ingestionSpec = createIngestionSpec(
|
||||
jsonMapper,
|
||||
@ -1784,6 +1831,11 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
false,
|
||||
false
|
||||
);
|
||||
expectedMessages = Arrays.asList(
|
||||
StringUtils.format("Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 3, Line: 6)", tmpFile.toURI()),
|
||||
StringUtils.format("Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 2, Line: 4)", tmpFile.toURI()),
|
||||
StringUtils.format("Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1, Line: 2)", tmpFile.toURI())
|
||||
);
|
||||
} else {
|
||||
ingestionSpec = createIngestionSpec(
|
||||
jsonMapper,
|
||||
@ -1795,6 +1847,11 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
false,
|
||||
false
|
||||
);
|
||||
expectedMessages = Arrays.asList(
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
|
||||
);
|
||||
}
|
||||
|
||||
IndexTask indexTask = new IndexTask(
|
||||
@ -1833,11 +1890,6 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
.getUnparseableEvents()
|
||||
.get(RowIngestionMeters.DETERMINE_PARTITIONS);
|
||||
|
||||
List<String> expectedMessages = Arrays.asList(
|
||||
"Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
|
||||
"Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
|
||||
);
|
||||
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
|
||||
return ((List<String>) r.get("details")).get(0);
|
||||
}).collect(Collectors.toList());
|
||||
@ -1957,6 +2009,7 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
// report parse exception
|
||||
final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true);
|
||||
final IndexIngestionSpec ingestionSpec;
|
||||
List<String> expectedMessages;
|
||||
if (useInputFormatApi) {
|
||||
ingestionSpec = createIngestionSpec(
|
||||
jsonMapper,
|
||||
@ -1970,6 +2023,12 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
false,
|
||||
false
|
||||
);
|
||||
expectedMessages = ImmutableList.of(
|
||||
StringUtils.format(
|
||||
"Timestamp[null] is unparseable! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1} (Path: %s, Record: 1, Line: 2)",
|
||||
tmpFile.toURI()
|
||||
)
|
||||
);
|
||||
} else {
|
||||
ingestionSpec = createIngestionSpec(
|
||||
jsonMapper,
|
||||
@ -1981,6 +2040,9 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
false,
|
||||
false
|
||||
);
|
||||
expectedMessages = ImmutableList.of(
|
||||
"Timestamp[null] is unparseable! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}"
|
||||
);
|
||||
}
|
||||
|
||||
IndexTask indexTask = new IndexTask(
|
||||
@ -2001,9 +2063,6 @@ public class IndexTaskTest extends IngestionTestBase
|
||||
.getUnparseableEvents()
|
||||
.get(RowIngestionMeters.BUILD_SEGMENTS);
|
||||
|
||||
List<String> expectedMessages = ImmutableList.of(
|
||||
"Timestamp[null] is unparseable! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}"
|
||||
);
|
||||
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
|
||||
return ((List<String>) r.get("details")).get(0);
|
||||
}).collect(Collectors.toList());
|
||||
|
@ -341,7 +341,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
|
||||
new ParseExceptionReport(
|
||||
"{ts=2017unparseable}",
|
||||
"unparseable",
|
||||
ImmutableList.of("Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable}"),
|
||||
ImmutableList.of(getErrorMessageForUnparseableTimestamp()),
|
||||
1L
|
||||
),
|
||||
new ParseExceptionReport(
|
||||
@ -462,7 +462,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
|
||||
new ParseExceptionReport(
|
||||
"{ts=2017unparseable}",
|
||||
"unparseable",
|
||||
ImmutableList.of("Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable}"),
|
||||
ImmutableList.of(getErrorMessageForUnparseableTimestamp()),
|
||||
1L
|
||||
),
|
||||
new ParseExceptionReport(
|
||||
@ -989,6 +989,14 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
|
||||
);
|
||||
}
|
||||
|
||||
private String getErrorMessageForUnparseableTimestamp()
|
||||
{
|
||||
return useInputFormatApi ? StringUtils.format(
|
||||
"Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable} (Path: %s, Record: 5, Line: 5)",
|
||||
new File(inputDir, "test_0").toURI()
|
||||
) : "Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable}";
|
||||
}
|
||||
|
||||
private static class SettableSplittableLocalInputSource extends LocalInputSource
|
||||
{
|
||||
private final boolean splittableInputSource;
|
||||
|
@ -186,7 +186,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
|
||||
getRawColumns().get(0),
|
||||
null,
|
||||
true,
|
||||
unparseableTimestampErrorString(data.get(0).getInput())
|
||||
unparseableTimestampErrorString(data.get(0).getInput(), 1)
|
||||
),
|
||||
data.get(0)
|
||||
);
|
||||
@ -195,7 +195,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
|
||||
getRawColumns().get(1),
|
||||
null,
|
||||
true,
|
||||
unparseableTimestampErrorString(data.get(1).getInput())
|
||||
unparseableTimestampErrorString(data.get(1).getInput(), 2)
|
||||
),
|
||||
data.get(1)
|
||||
);
|
||||
@ -204,7 +204,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
|
||||
getRawColumns().get(2),
|
||||
null,
|
||||
true,
|
||||
unparseableTimestampErrorString(data.get(2).getInput())
|
||||
unparseableTimestampErrorString(data.get(2).getInput(), 3)
|
||||
),
|
||||
data.get(2)
|
||||
);
|
||||
@ -213,7 +213,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
|
||||
getRawColumns().get(3),
|
||||
null,
|
||||
true,
|
||||
unparseableTimestampErrorString(data.get(3).getInput())
|
||||
unparseableTimestampErrorString(data.get(3).getInput(), 4)
|
||||
),
|
||||
data.get(3)
|
||||
);
|
||||
@ -222,7 +222,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
|
||||
getRawColumns().get(4),
|
||||
null,
|
||||
true,
|
||||
unparseableTimestampErrorString(data.get(4).getInput())
|
||||
unparseableTimestampErrorString(data.get(4).getInput(), 5)
|
||||
),
|
||||
data.get(4)
|
||||
);
|
||||
@ -231,7 +231,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
|
||||
getRawColumns().get(5),
|
||||
null,
|
||||
true,
|
||||
unparseableTimestampErrorString(data.get(5).getInput())
|
||||
unparseableTimestampErrorString(data.get(5).getInput(), 6)
|
||||
),
|
||||
data.get(5)
|
||||
);
|
||||
@ -259,7 +259,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
|
||||
getRawColumns().get(0),
|
||||
null,
|
||||
true,
|
||||
unparseableTimestampErrorString(data.get(0).getInput())
|
||||
unparseableTimestampErrorString(data.get(0).getInput(), 1)
|
||||
),
|
||||
data.get(0)
|
||||
);
|
||||
@ -268,7 +268,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
|
||||
getRawColumns().get(1),
|
||||
null,
|
||||
true,
|
||||
unparseableTimestampErrorString(data.get(1).getInput())
|
||||
unparseableTimestampErrorString(data.get(1).getInput(), 2)
|
||||
),
|
||||
data.get(1)
|
||||
);
|
||||
@ -277,7 +277,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
|
||||
getRawColumns().get(2),
|
||||
null,
|
||||
true,
|
||||
unparseableTimestampErrorString(data.get(2).getInput())
|
||||
unparseableTimestampErrorString(data.get(2).getInput(), 3)
|
||||
),
|
||||
data.get(2)
|
||||
);
|
||||
@ -1248,7 +1248,12 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
|
||||
//
|
||||
// first n rows are related to the first json block which fails to parse
|
||||
//
|
||||
String parseExceptionMessage = "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
|
||||
String parseExceptionMessage;
|
||||
if (useInputFormatApi) {
|
||||
parseExceptionMessage = "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
|
||||
} else {
|
||||
parseExceptionMessage = "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
|
||||
}
|
||||
for (; index < illegalRows; index++) {
|
||||
assertEqualsSamplerResponseRow(
|
||||
new SamplerResponseRow(
|
||||
@ -1436,14 +1441,24 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
|
||||
|
||||
private String getUnparseableTimestampString()
|
||||
{
|
||||
return ParserType.STR_CSV.equals(parserType)
|
||||
? "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
|
||||
: "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
|
||||
if (useInputFormatApi) {
|
||||
return ParserType.STR_CSV.equals(parserType)
|
||||
? "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6} (Line: 6)"
|
||||
: "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6} (Line: 6)";
|
||||
} else {
|
||||
return ParserType.STR_CSV.equals(parserType)
|
||||
? "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
|
||||
: "Timestamp[bad_timestamp] is unparseable! Event: {t=bad_timestamp, dim1=foo, met1=6}";
|
||||
}
|
||||
}
|
||||
|
||||
private String unparseableTimestampErrorString(Map<String, Object> rawColumns)
|
||||
private String unparseableTimestampErrorString(Map<String, Object> rawColumns, int line)
|
||||
{
|
||||
return StringUtils.format("Timestamp[null] is unparseable! Event: %s", rawColumns);
|
||||
if (useInputFormatApi) {
|
||||
return StringUtils.format("Timestamp[null] is unparseable! Event: %s (Line: %d)", rawColumns, line);
|
||||
} else {
|
||||
return StringUtils.format("Timestamp[null] is unparseable! Event: %s", rawColumns);
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
@ -75,6 +75,12 @@ public class SqlReader extends IntermediateRowParsingReader<Map<String, Object>>
|
||||
return jsonIterator;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputEntity source()
|
||||
{
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow) throws ParseException
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user