MSQ: Improve the parse exception errors and the handling of null UTF characters in Strings in Frames (#14398)

This commit is contained in:
Laksh Singla 2023-06-26 18:14:29 +05:30 committed by GitHub
parent 1647d5f4a0
commit 114380749d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1071 additions and 190 deletions

View File

@ -428,7 +428,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | |
| <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.<br /><br />This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. ([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern. Or, if your timestamps may genuinely be null, consider using [`COALESCE`](../querying/sql-scalar.md#other-scalar-functions) to provide a default value. One option is [`CURRENT_TIMESTAMP`](../querying/sql-scalar.md#date-and-time-functions), which represents the start time of the job.|
| <a name="error_InsertTimeOutOfBounds">`InsertTimeOutOfBounds`</a> | A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause.<br /> <br />To avoid this error, verify that the you specified is valid. | `interval`: time chunk interval corresponding to the out-of-bounds timestamp |
| <a name="error_InvalidNullByte">`InvalidNullByte`</a> | A string column included a null byte. Null bytes in strings are not permitted. | `column`: The column that included the null byte |
| <a name="error_InvalidNullByte">`InvalidNullByte`</a> | A string column included a null byte. Null bytes in strings are not permitted. |`source`: The source that included the null byte <br /></br /> `rowNumber`: The row number (1-indexed) that included the null byte <br /><br /> `column`: The column that included the null byte <br /><br /> `value`: Actual string containing the null byte <br /><br /> `position`: Position (1-indexed) of occurrence of null byte|
| <a name="error_QueryNotSupported">`QueryNotSupported`</a> | QueryKit could not translate the provided native query to a multi-stage query.<br /> <br />This can happen if the query uses features that aren't supported, like GROUPING SETS. | |
| <a name="error_QueryRuntimeError">`QueryRuntimeError`</a> | MSQ uses the native query engine to run the leaf stages. This error tells MSQ that error is in native query runtime.<br /> <br /> Since this is a generic error, the user needs to look at logs for the error message and stack trace to figure out the next course of action. If the user is stuck, consider raising a `github` issue for assistance. | `baseErrorMessage` error message from the native query runtime. |
| <a name="error_RowTooLarge">`RowTooLarge`</a> | The query tried to process a row that was too large to write to a single frame. See the [Limits](#limits) table for specific limits on frame size. Note that the effective maximum row size is smaller than the maximum frame size due to alignment considerations during frame writing. | `maxFrameSize`: The limit on the frame size. |

View File

@ -20,33 +20,106 @@
package org.apache.druid.msq.indexing.error;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* Fault thrown when the user tries to insert strings with \u0000 (NULL) byte, since that is unsupported by row-based
* frames
*/
@JsonTypeName(InvalidNullByteFault.CODE)
public class InvalidNullByteFault extends BaseMSQFault
{
static final String CODE = "InvalidNullByte";
private final String source;
private final Integer rowNumber;
private final String column;
private final String value;
private final Integer position;
/**
* All the parameters to the constructor can be null, in case we are unable to extract them from the site
* where the error was generated
* @param source source where \0000 containing string was found
* @param rowNumber rowNumber where the \0000 containing string was found (1-indexed)
* @param column column name where the \0000 containing string was found
* @param value value of the \0000 containing string
* @param position position (1-indexed) of \0000 in the string. This is added in case the test viewer skips or
* doesn't render \0000 correctly
*/
@JsonCreator
public InvalidNullByteFault(
@JsonProperty("column") final String column
@Nullable @JsonProperty("source") final String source,
@Nullable @JsonProperty("rowNumber") final Integer rowNumber,
@Nullable @JsonProperty("column") final String column,
@Nullable @JsonProperty("value") final String value,
@Nullable @JsonProperty("position") final Integer position
)
{
super(CODE, "Invalid null byte in string column [%s]", column);
super(
CODE,
"Invalid null byte at source [%s], rowNumber [%d], column[%s], value[%s], position[%d]. Consider sanitizing the string using REPLACE(\"%s\", U&'\\0000', '') AS %s",
source,
rowNumber,
column,
value,
position,
column,
column
);
this.source = source;
this.rowNumber = rowNumber;
this.column = column;
this.value = value;
this.position = position;
}
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getSource()
{
return source;
}
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getRowNumber()
{
return rowNumber;
}
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getValue()
{
return value;
}
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getColumn()
{
return column;
}
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getPosition()
{
return position;
}
@Override
public boolean equals(Object o)
{
@ -60,12 +133,16 @@ public class InvalidNullByteFault extends BaseMSQFault
return false;
}
InvalidNullByteFault that = (InvalidNullByteFault) o;
return Objects.equals(column, that.column);
return Objects.equals(source, that.source)
&& Objects.equals(rowNumber, that.rowNumber)
&& Objects.equals(column, that.column)
&& Objects.equals(value, that.value)
&& Objects.equals(position, that.position);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), column);
return Objects.hash(super.hashCode(), source, rowNumber, column, value, position);
}
}

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
@ -192,6 +193,15 @@ public class MSQErrorReport
return new TooManyBucketsFault(((TooManyBucketsException) cause).getMaxBuckets());
} else if (cause instanceof FrameRowTooLargeException) {
return new RowTooLargeFault(((FrameRowTooLargeException) cause).getMaxFrameSize());
} else if (cause instanceof InvalidNullByteException) {
InvalidNullByteException invalidNullByteException = (InvalidNullByteException) cause;
return new InvalidNullByteFault(
invalidNullByteException.getSource(),
invalidNullByteException.getRowNumber(),
invalidNullByteException.getColumn(),
invalidNullByteException.getValue(),
invalidNullByteException.getPosition()
);
} else if (cause instanceof UnexpectedMultiValueDimensionException) {
return new QueryRuntimeFault(StringUtils.format(
"Column [%s] is a multi-value string. Please wrap the column using MV_TO_ARRAY() to proceed further.",

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.input;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.input.external.ExternalSegment;
import org.apache.druid.msq.input.inline.InlineInputSliceReader;
import org.apache.druid.query.lookup.LookupSegment;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import javax.annotation.Nullable;
/**
* Utility class containing methods that help in generating the {@link org.apache.druid.sql.calcite.parser.ParseException}
* in the frame processors
*/
public class ParseExceptionUtils
{
/**
* Given a segment, this returns the human-readable description of the segment which can allow user to figure out the
* source of the parse exception
*/
@Nullable
public static String generateReadableInputSourceNameFromMappedSegment(Segment segment)
{
if (segment instanceof ExternalSegment) {
return StringUtils.format("external input source: %s", ((ExternalSegment) segment).externalInputSource().toString());
} else if (segment instanceof LookupSegment) {
return StringUtils.format("lookup input source: %s", segment.getId().getDataSource());
} else if (segment instanceof QueryableIndexSegment) {
return StringUtils.format("table input source: %s", segment.getId().getDataSource());
} else if (InlineInputSliceReader.SEGMENT_ID.equals(segment.getId().getDataSource())) {
return "inline input source";
}
return null;
}
}

View File

@ -23,7 +23,6 @@ import com.google.common.collect.Iterators;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
@ -33,15 +32,11 @@ import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterNames;
import org.apache.druid.msq.counters.CounterTracker;
import org.apache.druid.msq.counters.WarningCounters;
import org.apache.druid.msq.indexing.CountableInputSourceReader;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.NilInputSource;
@ -49,18 +44,16 @@ import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.ReadableInputs;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.timeline.SegmentId;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -69,6 +62,7 @@ import java.util.stream.Collectors;
*/
public class ExternalInputSliceReader implements InputSliceReader
{
public static final String SEGMENT_ID = "__external";
private final File temporaryDirectory;
public ExternalInputSliceReader(final File temporaryDirectory)
@ -153,86 +147,16 @@ public class ExternalInputSliceReader implements InputSliceReader
reader = inputSource.reader(schema, inputFormat, temporaryDirectory);
}
final SegmentId segmentId = SegmentId.dummy("dummy");
final RowBasedSegment<InputRow> segment = new RowBasedSegment<>(
segmentId,
new BaseSequence<>(
new BaseSequence.IteratorMaker<InputRow, CloseableIterator<InputRow>>()
{
@Override
public CloseableIterator<InputRow> make()
{
try {
CloseableIterator<InputRow> baseIterator = reader.read(inputStats);
return new CloseableIterator<InputRow>()
{
private InputRow next = null;
@Override
public void close() throws IOException
{
baseIterator.close();
}
@Override
public boolean hasNext()
{
while (true) {
try {
while (next == null && baseIterator.hasNext()) {
next = baseIterator.next();
}
break;
}
catch (ParseException e) {
warningCounters.incrementWarningCount(CannotParseExternalDataFault.CODE);
warningPublisher.accept(e);
}
}
return next != null;
}
@Override
public InputRow next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
final InputRow row = next;
next = null;
return row;
}
};
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void cleanup(CloseableIterator<InputRow> iterFromMake)
{
try {
iterFromMake.close();
// We increment the file count whenever the caller calls clean up. So we can double count here
// if the callers are not careful.
// This logic only works because we are using FilePerSplitHintSpec. Each input source only
// has one file.
if (incrementCounters) {
channelCounters.incrementFileCount();
channelCounters.incrementBytes(inputStats.getProcessedBytes());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
),
RowAdapters.standardRow(),
final SegmentId segmentId = SegmentId.dummy(SEGMENT_ID);
final Segment segment = new ExternalSegment(
inputSource,
reader,
inputStats,
warningCounters,
warningPublisher,
channelCounters,
signature
);
return new SegmentWithDescriptor(
() -> ResourceHolder.fromCloseable(segment),
segmentId.toDescriptor()
@ -241,7 +165,7 @@ public class ExternalInputSliceReader implements InputSliceReader
);
}
static boolean isFileBasedInputSource(final InputSource inputSource)
public static boolean isFileBasedInputSource(final InputSource inputSource)
{
return !(inputSource instanceof NilInputSource) && !(inputSource instanceof InlineInputSource);
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.input.external;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.WarningCounters;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.SegmentId;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
/**
* Segment representing the rows read from an external source. This is currently returned when using EXTERN with MSQ
*/
public class ExternalSegment extends RowBasedSegment<InputRow>
{
private final InputSource inputSource;
public static final String SEGMENT_ID = "__external";
/**
* @param inputSource {@link InputSource} that the segment is a representation of
* @param reader reader to read the external input source
* @param inputStats input stats
* @param warningCounters warning counters tracking the warnings generated while reading the external source
* @param warningPublisher publisher to report the warnings generated
* @param channelCounters channel counters to increment as we read through the files/units of the external source
* @param signature signature of the external source
*/
public ExternalSegment(
final InputSource inputSource,
final InputSourceReader reader,
final InputStats inputStats,
final WarningCounters warningCounters,
final Consumer<Throwable> warningPublisher,
final ChannelCounters channelCounters,
final RowSignature signature
)
{
super(
SegmentId.dummy(SEGMENT_ID),
new BaseSequence<>(
new BaseSequence.IteratorMaker<InputRow, CloseableIterator<InputRow>>()
{
@Override
public CloseableIterator<InputRow> make()
{
try {
CloseableIterator<InputRow> baseIterator = reader.read(inputStats);
return new CloseableIterator<InputRow>()
{
private InputRow next = null;
@Override
public void close() throws IOException
{
baseIterator.close();
}
@Override
public boolean hasNext()
{
while (true) {
try {
while (next == null && baseIterator.hasNext()) {
next = baseIterator.next();
}
break;
}
catch (ParseException e) {
warningCounters.incrementWarningCount(CannotParseExternalDataFault.CODE);
warningPublisher.accept(e);
}
}
return next != null;
}
@Override
public InputRow next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
final InputRow row = next;
next = null;
return row;
}
};
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void cleanup(CloseableIterator<InputRow> iterFromMake)
{
try {
iterFromMake.close();
// We increment the file count whenever the caller calls clean up. So we can double count here
// if the callers are not careful.
// This logic only works because we are using FilePerSplitHintSpec. Each input source only
// has one file.
if (ExternalInputSliceReader.isFileBasedInputSource(inputSource)) {
channelCounters.incrementFileCount();
channelCounters.incrementBytes(inputStats.getProcessedBytes());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
),
RowAdapters.standardRow(),
signature
);
this.inputSource = inputSource;
}
/**
* Return the input source that the segment is a representation of
*/
public InputSource externalInputSource()
{
return inputSource;
}
}

View File

@ -42,7 +42,8 @@ import java.util.function.Consumer;
*/
public class InlineInputSliceReader implements InputSliceReader
{
private static final SegmentDescriptor DUMMY_SEGMENT_DESCRIPTOR = SegmentId.dummy("inline").toDescriptor();
public static final String SEGMENT_ID = "__inline";
private static final SegmentDescriptor DUMMY_SEGMENT_DESCRIPTOR = SegmentId.dummy(SEGMENT_ID).toDescriptor();
private final SegmentWrangler segmentWrangler;

View File

@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.querykit.scan;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.RowIdSupplier;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
/**
* A column selector factory wrapper which wraps the underlying factory's errors into a {@link ParseException}.
* This is used when reading from external data, since failure to read the data is usually an issue with the external
* input
*/
public class ExternalColumnSelectorFactory implements ColumnSelectorFactory
{
private static final String ERROR_MESSAGE_FORMAT_STRING =
"Error while trying to read the external data source at inputSource [%s], rowNumber [%d], columnName [%s]";
private final ColumnSelectorFactory delegate;
private final InputSource inputSource;
private final SimpleSettableOffset offset;
public ExternalColumnSelectorFactory(
final ColumnSelectorFactory delgate,
final InputSource inputSource,
final SimpleSettableOffset offset
)
{
this.delegate = delgate;
this.inputSource = inputSource;
this.offset = offset;
}
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
return new DimensionSelector()
{
final DimensionSelector delegateDimensionSelector = delegate.makeDimensionSelector(dimensionSpec);
@Override
public IndexedInts getRow()
{
return delegateDimensionSelector.getRow();
}
@Override
public ValueMatcher makeValueMatcher(@Nullable String value)
{
return delegateDimensionSelector.makeValueMatcher(value);
}
@Override
public ValueMatcher makeValueMatcher(Predicate<String> predicate)
{
return delegateDimensionSelector.makeValueMatcher(predicate);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
delegateDimensionSelector.inspectRuntimeShape(inspector);
}
@Nullable
@Override
public Object getObject()
{
try {
return delegateDimensionSelector.getObject();
}
catch (Exception e) {
throw createException(e, dimensionSpec.getDimension(), inputSource, offset);
}
}
@Override
public Class<?> classOfObject()
{
return delegateDimensionSelector.classOfObject();
}
@Override
public int getValueCardinality()
{
return delegateDimensionSelector.getValueCardinality();
}
@Nullable
@Override
public String lookupName(int id)
{
return delegateDimensionSelector.lookupName(id);
}
@Override
public boolean nameLookupPossibleInAdvance()
{
return delegateDimensionSelector.nameLookupPossibleInAdvance();
}
@Nullable
@Override
public IdLookup idLookup()
{
return delegateDimensionSelector.idLookup();
}
};
}
@Override
public ColumnValueSelector makeColumnValueSelector(String columnName)
{
return new ColumnValueSelector()
{
final ColumnValueSelector delegateColumnValueSelector = delegate.makeColumnValueSelector(columnName);
@Override
public double getDouble()
{
try {
return delegateColumnValueSelector.getDouble();
}
catch (Exception e) {
throw createException(e, columnName, inputSource, offset);
}
}
@Override
public float getFloat()
{
try {
return delegateColumnValueSelector.getFloat();
}
catch (Exception e) {
throw createException(e, columnName, inputSource, offset);
}
}
@Override
public long getLong()
{
try {
return delegateColumnValueSelector.getLong();
}
catch (Exception e) {
throw createException(e, columnName, inputSource, offset);
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
delegateColumnValueSelector.inspectRuntimeShape(inspector);
}
@Override
public boolean isNull()
{
return delegateColumnValueSelector.isNull();
}
@Nullable
@Override
public Object getObject()
{
try {
return delegateColumnValueSelector.getObject();
}
catch (Exception e) {
throw createException(e, columnName, inputSource, offset);
}
}
@Override
public Class classOfObject()
{
return delegateColumnValueSelector.classOfObject();
}
};
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return delegate.getColumnCapabilities(column);
}
@Nullable
@Override
public RowIdSupplier getRowIdSupplier()
{
return delegate.getRowIdSupplier();
}
public static ParseException createException(
Exception cause,
String columnName,
InputSource inputSource,
SimpleSettableOffset offset
)
{
return new ParseException(
null,
cause,
ERROR_MESSAGE_FORMAT_STRING,
inputSource.toString(),
(long) (offset.getOffset()) + 1,
columnName
);
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.msq.querykit.scan;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.IntSet;
@ -37,6 +38,7 @@ import org.apache.druid.frame.segment.FrameSegment;
import org.apache.druid.frame.util.SettableLongVirtualColumn;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -44,7 +46,9 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.msq.input.ParseExceptionUtils;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.external.ExternalSegment;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessor;
import org.apache.druid.msq.querykit.QueryKitUtils;
@ -55,6 +59,8 @@ import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
@ -82,6 +88,8 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
private long rowsOutput = 0;
private Cursor cursor;
private Segment segment;
private final SimpleSettableOffset cursorOffset = new SimpleAscendingOffset(Integer.MAX_VALUE);
private FrameWriter frameWriter;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
@ -164,13 +172,13 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
cursorYielder.close();
return ReturnOrAwait.returnObject(rowsOutput);
} else {
final long rowsFlushed = setNextCursor(cursorYielder.get());
final long rowsFlushed = setNextCursor(cursorYielder.get(), segmentHolder.get());
assert rowsFlushed == 0; // There's only ever one cursor when running with a segment
closer.register(cursorYielder);
}
}
populateFrameWriterAndFlushIfNeeded();
populateFrameWriterAndFlushIfNeededWithExceptionHandling();
if (cursor.isDone()) {
flushFrameWriter();
@ -200,7 +208,8 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
mapSegment(frameSegment).asStorageAdapter()
).toList()
)
),
frameSegment
);
if (rowsFlushed > 0) {
@ -215,7 +224,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
}
// Cursor has some more data in it.
populateFrameWriterAndFlushIfNeeded();
populateFrameWriterAndFlushIfNeededWithExceptionHandling();
if (cursor.isDone()) {
return ReturnOrAwait.awaitAll(inputChannels().size());
@ -224,6 +233,26 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
}
}
/**
* Populates the null exception message with the input source name and the row number
*/
private void populateFrameWriterAndFlushIfNeededWithExceptionHandling()
{
try {
populateFrameWriterAndFlushIfNeeded();
}
catch (InvalidNullByteException inbe) {
InvalidNullByteException.Builder builder = InvalidNullByteException.builder(inbe);
throw
builder.source(ParseExceptionUtils.generateReadableInputSourceNameFromMappedSegment(this.segment)) // frame segment
.rowNumber(this.cursorOffset.getOffset() + 1)
.build();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private void populateFrameWriterAndFlushIfNeeded() throws IOException
{
createFrameWriterIfNeeded();
@ -244,6 +273,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
}
cursor.advance();
cursorOffset.increment();
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1);
}
}
@ -253,7 +283,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
if (frameWriter == null) {
final FrameWriterFactory frameWriterFactory = getFrameWriterFactory();
final ColumnSelectorFactory frameWriterColumnSelectorFactory =
frameWriterVirtualColumns.wrap(cursor.getColumnSelectorFactory());
wrapColumnSelectorFactoryIfNeeded(frameWriterVirtualColumns.wrap(cursor.getColumnSelectorFactory()));
frameWriter = frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactory);
currentAllocatorCapacity = frameWriterFactory.allocatorCapacity();
}
@ -278,13 +308,30 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
}
}
private long setNextCursor(final Cursor cursor) throws IOException
private long setNextCursor(final Cursor cursor, final Segment segment) throws IOException
{
final long rowsFlushed = flushFrameWriter();
this.cursor = cursor;
this.segment = segment;
this.cursorOffset.reset();
return rowsFlushed;
}
/**
* Wraps the column selector factory if the underlying input to the processor is an external source
*/
private ColumnSelectorFactory wrapColumnSelectorFactoryIfNeeded(final ColumnSelectorFactory baseColumnSelectorFactory)
{
if (segment instanceof ExternalSegment) {
return new ExternalColumnSelectorFactory(
baseColumnSelectorFactory,
((ExternalSegment) segment).externalInputSource(),
cursorOffset
);
}
return baseColumnSelectorFactory;
}
private static Sequence<Cursor> makeCursors(final ScanQuery query, final StorageAdapter adapter)
{
if (adapter == null) {

View File

@ -0,0 +1,276 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.InvalidNullByteFault;
import org.apache.druid.msq.querykit.scan.ExternalColumnSelectorFactory;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class MSQParseExceptionsTest extends MSQTestBase
{
@Test
public void testIngestWithNullByte() throws IOException
{
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(
temporaryFolder,
this,
"/unparseable-null-byte-string.csv"
);
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("agent_category", ColumnType.STRING)
.build();
testIngestQuery()
.setSql("INSERT INTO foo1\n"
+ "WITH\n"
+ "kttm_data AS (\n"
+ "SELECT * FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + toReadAsJson + "],\"type\":\"local\"}',\n"
+ " '{\"type\":\"csv\", \"findColumnsFromHeader\":true}',\n"
+ " '[{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"agent_category\",\"type\":\"string\"},{\"name\":\"agent_type\",\"type\":\"string\"},{\"name\":\"browser\",\"type\":\"string\"}]'\n"
+ " )\n"
+ "))\n"
+ "\n"
+ "SELECT\n"
+ " FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+ " \"agent_category\"\n"
+ "FROM kttm_data\n"
+ "PARTITIONED BY ALL")
.setExpectedRowSignature(rowSignature)
.setExpectedDataSource("foo1")
.setExpectedMSQFault(
new InvalidNullByteFault(
StringUtils.format(
"external input source: LocalInputSource{baseDir=\"null\", filter=null, files=[%s]}",
toRead.getAbsolutePath()
),
1,
"agent_category",
"Personal computer\u0000",
17
)
)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@Test
public void testIngestWithSanitizedNullByte() throws IOException
{
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(
temporaryFolder,
this,
"/unparseable-null-byte-string.csv"
);
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("agent_category", ColumnType.STRING)
.build();
Map<String, Object> context = new HashMap<>(DEFAULT_MSQ_CONTEXT);
context.put("sqlInsertSegmentGranularity", "{\"type\":\"all\"}");
final ScanQuery expectedQuery =
newScanQueryBuilder()
.dataSource(
new ExternalDataSource(
new LocalInputSource(null, null, ImmutableList.of(toRead)),
new CsvInputFormat(null, null, null, true, 0),
RowSignature.builder()
.add("timestamp", ColumnType.STRING)
.add("agent_category", ColumnType.STRING)
.add("agent_type", ColumnType.STRING)
.add("browser", ColumnType.STRING)
.build()
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn(
"v0",
"timestamp_floor(timestamp_parse(\"timestamp\",null,'UTC'),'PT1M',null,'UTC')",
ColumnType.LONG
),
expressionVirtualColumn(
"v1",
"replace(\"agent_category\",'\\u0000','')",
ColumnType.STRING
)
)
.columns("v0", "v1")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(defaultScanQueryContext(
context,
RowSignature.builder()
.add("v0", ColumnType.LONG)
.add("v1", ColumnType.STRING)
.build()
))
.build();
testIngestQuery()
.setSql("INSERT INTO foo1\n"
+ "WITH\n"
+ "kttm_data AS (\n"
+ "SELECT * FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + toReadAsJson + "],\"type\":\"local\"}',\n"
+ " '{\"type\":\"csv\", \"findColumnsFromHeader\":true}',\n"
+ " '[{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"agent_category\",\"type\":\"string\"},{\"name\":\"agent_type\",\"type\":\"string\"},{\"name\":\"browser\",\"type\":\"string\"}]'\n"
+ " )\n"
+ "))\n"
+ "\n"
+ "SELECT\n"
+ " FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+ " REPLACE(\"agent_category\", U&'\\0000', '') as \"agent_category\"\n"
+ "FROM kttm_data\n"
+ "PARTITIONED BY ALL")
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1566691200000L, "Personal computer"},
new Object[]{1566691200000L, "Personal computer"},
new Object[]{1566691200000L, "Smartphone"}
))
.setExpectedDataSource("foo1")
.setExpectedMSQSpec(
MSQSpec
.builder()
.query(expectedQuery)
.columnMappings(new ColumnMappings(
ImmutableList.of(
new ColumnMapping("v0", "__time"),
new ColumnMapping("v1", "agent_category")
)
))
.destination(new DataSourceMSQDestination("foo1", Granularities.ALL, null, null))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@Test
public void testMultiValueStringWithIncorrectType() throws IOException
{
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(
temporaryFolder,
this,
"/unparseable-mv-string-array.json"
);
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("language", ColumnType.STRING_ARRAY)
.build();
final GroupByQuery expectedQuery =
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("__time", "d0", ColumnType.LONG)))
.build();
testSelectQuery()
.setSql("WITH\n"
+ "kttm_data AS (\n"
+ "SELECT * FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + toReadAsJson + "],\"type\":\"local\"}',\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"agent_category\",\"type\":\"string\"},{\"name\":\"agent_type\",\"type\":\"string\"},{\"name\":\"browser\",\"type\":\"string\"},{\"name\":\"browser_version\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"continent\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"event_type\",\"type\":\"string\"},{\"name\":\"event_subtype\",\"type\":\"string\"},{\"name\":\"loaded_image\",\"type\":\"string\"},{\"name\":\"adblock_list\",\"type\":\"string\"},{\"name\":\"forwarded_for\",\"type\":\"string\"},{\"name\":\"language\",\"type\":\"string\"},{\"name\":\"number\",\"type\":\"long\"},{\"name\":\"os\",\"type\":\"string\"},{\"name\":\"path\",\"type\":\"string\"},{\"name\":\"platform\",\"type\":\"string\"},{\"name\":\"referrer\",\"type\":\"string\"},{\"name\":\"referrer_host\",\"type\":\"string\"},{\"name\":\"region\",\"type\":\"string\"},{\"name\":\"remote_address\",\"type\":\"string\"},{\"name\":\"screen\",\"type\":\"string\"},{\"name\":\"session\",\"type\":\"string\"},{\"name\":\"session_length\",\"type\":\"long\"},{\"name\":\"timezone\",\"type\":\"string\"},{\"name\":\"timezone_offset\",\"type\":\"long\"},{\"name\":\"window\",\"type\":\"string\"}]'\n"
+ " )\n"
+ "))\n"
+ "\n"
+ "SELECT\n"
+ " FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+ " MV_TO_ARRAY(\"language\") AS \"language\"\n"
+ "FROM kttm_data")
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1566691200000L, ImmutableList.of("en")},
new Object[]{1566691200000L, ImmutableList.of("en", "es", "es-419", "es-MX")},
new Object[]{1566691200000L, ImmutableList.of("en", "es", "es-419", "es-US")}
))
.setExpectedMSQSpec(
MSQSpec
.builder()
.query(expectedQuery)
.columnMappings(new ColumnMappings(
ImmutableList.of(
new ColumnMapping("d0", "__time"),
new ColumnMapping("a0", "cnt")
)
))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setExpectedMSQFault(
new CannotParseExternalDataFault(
ExternalColumnSelectorFactory
.createException(
new Exception("dummy"),
"v1",
new LocalInputSource(null, null, ImmutableList.of(toRead)),
new SimpleAscendingOffset(Integer.MAX_VALUE)
)
.getMessage()
)
)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
}

View File

@ -37,7 +37,6 @@ import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
@ -1833,69 +1832,6 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults();
}
@Test
public void testMultiValueStringWithIncorrectType() throws IOException
{
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(
temporaryFolder,
this,
"/unparseable-mv-string-array.json"
);
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("language", ColumnType.STRING_ARRAY)
.build();
final GroupByQuery expectedQuery =
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(new DefaultDimensionSpec("__time", "d0", ColumnType.LONG)))
.build();
testSelectQuery()
.setSql("WITH\n"
+ "kttm_data AS (\n"
+ "SELECT * FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + toReadAsJson + "],\"type\":\"local\"}',\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"agent_category\",\"type\":\"string\"},{\"name\":\"agent_type\",\"type\":\"string\"},{\"name\":\"browser\",\"type\":\"string\"},{\"name\":\"browser_version\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"continent\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"event_type\",\"type\":\"string\"},{\"name\":\"event_subtype\",\"type\":\"string\"},{\"name\":\"loaded_image\",\"type\":\"string\"},{\"name\":\"adblock_list\",\"type\":\"string\"},{\"name\":\"forwarded_for\",\"type\":\"string\"},{\"name\":\"language\",\"type\":\"string\"},{\"name\":\"number\",\"type\":\"long\"},{\"name\":\"os\",\"type\":\"string\"},{\"name\":\"path\",\"type\":\"string\"},{\"name\":\"platform\",\"type\":\"string\"},{\"name\":\"referrer\",\"type\":\"string\"},{\"name\":\"referrer_host\",\"type\":\"string\"},{\"name\":\"region\",\"type\":\"string\"},{\"name\":\"remote_address\",\"type\":\"string\"},{\"name\":\"screen\",\"type\":\"string\"},{\"name\":\"session\",\"type\":\"string\"},{\"name\":\"session_length\",\"type\":\"long\"},{\"name\":\"timezone\",\"type\":\"string\"},{\"name\":\"timezone_offset\",\"type\":\"long\"},{\"name\":\"window\",\"type\":\"string\"}]'\n"
+ " )\n"
+ "))\n"
+ "\n"
+ "SELECT\n"
+ " FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+ " MV_TO_ARRAY(\"language\") AS \"language\"\n"
+ "FROM kttm_data")
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1566691200000L, ImmutableList.of("en")},
new Object[]{1566691200000L, ImmutableList.of("en", "es", "es-419", "es-MX")},
new Object[]{1566691200000L, ImmutableList.of("en", "es", "es-419", "es-US")}
))
.setExpectedMSQSpec(
MSQSpec
.builder()
.query(expectedQuery)
.columnMappings(new ColumnMappings(
ImmutableList.of(
new ColumnMapping("d0", "__time"),
new ColumnMapping("a0", "cnt")
)
))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setExpectedMSQFault(new CannotParseExternalDataFault(
"Unable to add the row to the frame. Type conversion might be required."))
.setQueryContext(context)
.verifyResults();
}
@Nonnull
private List<Object[]> expectedMultiValueFooRowsGroup()
{

View File

@ -59,7 +59,7 @@ public class MSQFaultSerdeTest
assertFaultSerde(InsertLockPreemptedFault.INSTANCE);
assertFaultSerde(InsertTimeNullFault.INSTANCE);
assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY));
assertFaultSerde(new InvalidNullByteFault("the column"));
assertFaultSerde(new InvalidNullByteFault("the source", 1, "the column", "the value", 2));
assertFaultSerde(new NotEnoughMemoryFault(1000, 1000, 900, 1, 2));
assertFaultSerde(QueryNotSupportedFault.INSTANCE);
assertFaultSerde(new QueryRuntimeFault("new error", "base error"));

Binary file not shown.
1 timestamp agent_category agent_type browser
2 2019-08-25T00:00:00.031Z Personal computer� Browser Chrome
3 2019-08-25T00:00:00.059Z Smartphone Mobile Browser Chrome Mobile
4 2019-08-25T00:00:00.178Z Personal computer Browser Chrome

View File

@ -229,7 +229,12 @@ public class FrameWriterUtils
final byte b = src.get(p);
if (!allowNullBytes && b == 0) {
throw new InvalidNullByteException();
ByteBuffer duplicate = src.duplicate();
duplicate.limit(srcEnd);
throw InvalidNullByteException.builder()
.value(StringUtils.fromUtf8(duplicate))
.position(p - src.position())
.build();
}
dst.putByte(q, b);

View File

@ -19,10 +19,160 @@
package org.apache.druid.frame.write;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
/**
* Exception thrown by {@link FrameWriterUtils#copyByteBufferToMemory} if configured to check for null bytes
* and a null byte is encountered.
*/
public class InvalidNullByteException extends RuntimeException
{
@Nullable
private final String source;
@Nullable
private final Integer rowNumber;
@Nullable
private final String column;
@Nullable
private final String value;
@Nullable
private final Integer position;
private InvalidNullByteException(
@Nullable final String source,
@Nullable final Integer rowNumber,
@Nullable final String column,
@Nullable final String value,
@Nullable final Integer position
)
{
super(StringUtils.format(
"Encountered null byte at source[%s], rowNumber[%d], column[%s], value[%s], position[%s]",
source,
rowNumber,
column,
value,
position
));
this.source = source;
this.rowNumber = rowNumber;
this.column = column;
this.value = value;
this.position = position;
}
public static Builder builder()
{
return new Builder();
}
public static Builder builder(InvalidNullByteException invalidNullByteException)
{
return new Builder(invalidNullByteException);
}
@Nullable
public Integer getRowNumber()
{
return rowNumber;
}
@Nullable
public String getColumn()
{
return column;
}
@Nullable
public String getValue()
{
return value;
}
@Nullable
public String getSource()
{
return source;
}
@Nullable
public Integer getPosition()
{
return position;
}
public static class Builder
{
@Nullable
private String source;
@Nullable
private Integer rowNumber;
@Nullable
private String column;
@Nullable
private String value;
@Nullable
private Integer position;
public Builder()
{
}
public Builder(InvalidNullByteException invalidNullByteException)
{
this.source = invalidNullByteException.source;
this.rowNumber = invalidNullByteException.rowNumber;
this.column = invalidNullByteException.column;
this.value = invalidNullByteException.value;
this.position = invalidNullByteException.position;
}
public InvalidNullByteException build()
{
return new InvalidNullByteException(source, rowNumber, column, value, position);
}
public Builder source(final String source)
{
this.source = source;
return this;
}
public Builder rowNumber(final Integer rowNumber)
{
this.rowNumber = rowNumber;
return this;
}
public Builder column(final String column)
{
this.column = column;
return this;
}
public Builder value(final String value)
{
this.value = value;
return this;
}
public Builder position(final Integer position)
{
this.position = position;
return this;
}
}
}

View File

@ -33,7 +33,6 @@ import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.utils.CloseableUtils;
@ -120,14 +119,8 @@ public class RowBasedFrameWriter implements FrameWriter
return false;
}
try {
if (!writeData()) {
return false;
}
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, ParseException.class);
throw new ParseException("", e, "Unable to add the row to the frame. Type conversion might be required.");
if (!writeData()) {
return false;
}
final MemoryRange<WritableMemory> rowOffsetCursor = rowOffsetMemory.cursor();
@ -294,11 +287,22 @@ public class RowBasedFrameWriter implements FrameWriter
final long writeResult;
// May throw InvalidNullByteException; allow it to propagate upwards.
writeResult = fieldWriter.writeTo(
dataCursor.memory(),
dataCursor.start() + bytesWritten,
remainingInBlock - bytesWritten
);
try {
writeResult = fieldWriter.writeTo(
dataCursor.memory(),
dataCursor.start() + bytesWritten,
remainingInBlock - bytesWritten
);
}
catch (InvalidNullByteException inbe) {
throw InvalidNullByteException.builder(inbe)
.column(signature.getColumnName(i))
.build();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
if (writeResult < 0) {
// Reset to beginning of loop.

View File

@ -19,7 +19,6 @@
package org.apache.druid.frame.write.columnar;
import com.google.common.base.Throwables;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
@ -31,7 +30,6 @@ import org.apache.druid.frame.write.FrameSort;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
@ -76,17 +74,11 @@ public class ColumnarFrameWriter implements FrameWriter
}
int i = 0;
try {
for (; i < columnWriters.size(); i++) {
if (!columnWriters.get(i).addSelection()) {
break;
}
for (; i < columnWriters.size(); i++) {
if (!columnWriters.get(i).addSelection()) {
break;
}
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, ParseException.class);
throw new ParseException("", e, "Unable to add the row to the frame. Type conversion might be required.");
}
if (i < columnWriters.size()) {
// Add failed, clean up.

View File

@ -130,6 +130,8 @@ public abstract class StringFrameColumnWriter<T extends ColumnValueSelector> imp
if (len > 0) {
assert stringDataCursor != null; // Won't be null when len > 0, since utf8DataByteLength would be > 0.
// Since we allow null bytes, this call wouldn't throw InvalidNullByteException
FrameWriterUtils.copyByteBufferToMemory(
utf8Datum,
stringDataCursor.memory(),