Display the output column name in InvalidNullByteException (#14780)

This PR maps the query column to the output column name while surfacing the fault since that is readily visible to the user while executing the query.
This commit is contained in:
Laksh Singla 2023-08-24 04:24:41 +00:00 committed by GitHub
parent 36e659a501
commit f9f734cde5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 8 deletions

View File

@ -55,6 +55,7 @@ import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.key.RowKeyReader; import org.apache.druid.frame.key.RowKeyReader;
import org.apache.druid.frame.processor.FrameProcessorExecutor; import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.util.DurableStorageUtils; import org.apache.druid.frame.util.DurableStorageUtils;
import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.LockGranularity;
@ -104,6 +105,7 @@ import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault; import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault; import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.InvalidNullByteFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport; import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQFault; import org.apache.druid.msq.indexing.error.MSQFault;
@ -412,9 +414,15 @@ public class ControllerImpl implements Controller
final String selfHost = MSQTasks.getHostFromSelfNode(selfDruidNode); final String selfHost = MSQTasks.getHostFromSelfNode(selfDruidNode);
final MSQErrorReport controllerError = final MSQErrorReport controllerError =
exceptionEncountered != null exceptionEncountered != null
? MSQErrorReport.fromException(id(), selfHost, null, exceptionEncountered) ? MSQErrorReport.fromException(
id(),
selfHost,
null,
exceptionEncountered,
task.getQuerySpec().getColumnMappings()
)
: null; : null;
final MSQErrorReport workerError = workerErrorRef.get(); MSQErrorReport workerError = workerErrorRef.get();
taskStateForReport = TaskState.FAILED; taskStateForReport = TaskState.FAILED;
errorForReport = MSQTasks.makeErrorReport(id(), selfHost, controllerError, workerError); errorForReport = MSQTasks.makeErrorReport(id(), selfHost, controllerError, workerError);
@ -748,7 +756,10 @@ public class ControllerImpl implements Controller
!workerTaskLauncher.isTaskLatest(errorReport.getTaskId())) { !workerTaskLauncher.isTaskLatest(errorReport.getTaskId())) {
log.info("Ignoring task %s", errorReport.getTaskId()); log.info("Ignoring task %s", errorReport.getTaskId());
} else { } else {
workerErrorRef.compareAndSet(null, errorReport); workerErrorRef.compareAndSet(
null,
mapQueryColumnNameToOutputColumnName(errorReport)
);
} }
} }
@ -2651,6 +2662,39 @@ public class ControllerImpl implements Controller
return mergeMode; return mergeMode;
} }
/**
* Maps the query column names (used internally while generating the query plan) to output column names (the one used
* by the user in the SQL query) for certain errors reported by workers (where they have limited knowledge of the
* ColumnMappings). For remaining errors not relying on the query column names, it returns it as is.
*/
@Nullable
private MSQErrorReport mapQueryColumnNameToOutputColumnName(
@Nullable final MSQErrorReport workerErrorReport
)
{
if (workerErrorReport == null) {
return null;
} else if (workerErrorReport.getFault() instanceof InvalidNullByteFault) {
InvalidNullByteFault inbf = (InvalidNullByteFault) workerErrorReport.getFault();
return MSQErrorReport.fromException(
workerErrorReport.getTaskId(),
workerErrorReport.getHost(),
workerErrorReport.getStageNumber(),
InvalidNullByteException.builder()
.source(inbf.getSource())
.rowNumber(inbf.getRowNumber())
.column(inbf.getColumn())
.value(inbf.getValue())
.position(inbf.getPosition())
.build(),
task.getQuerySpec().getColumnMappings()
);
} else {
return workerErrorReport;
}
}
/** /**
* Interface used by {@link #contactWorkersForStage}. * Interface used by {@link #contactWorkersForStage}.

View File

@ -63,7 +63,8 @@ public class InvalidNullByteFault extends BaseMSQFault
{ {
super( super(
CODE, CODE,
"Invalid null byte at source [%s], rowNumber [%d], column[%s], value[%s], position[%d]. Consider sanitizing the string using REPLACE(\"%s\", U&'\\0000', '') AS %s", "Invalid null byte at source [%s], rowNumber [%d], column[%s], value[%s], position[%d]. "
+ "Consider sanitizing the input string column using REPLACE(\"%s\", U&'\\0000', '') AS %s",
source, source,
rowNumber, rowNumber,
column, column,

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.frame.processor.FrameRowTooLargeException; import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.write.InvalidNullByteException; import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.frame.write.UnsupportedColumnTypeException; import org.apache.druid.frame.write.UnsupportedColumnTypeException;
@ -31,6 +32,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.msq.statistics.TooManyBucketsException; import org.apache.druid.msq.statistics.TooManyBucketsException;
import org.apache.druid.query.groupby.epinephelinae.UnexpectedMultiValueDimensionException; import org.apache.druid.query.groupby.epinephelinae.UnexpectedMultiValueDimensionException;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Objects; import java.util.Objects;
@ -78,12 +80,23 @@ public class MSQErrorReport
@Nullable final Integer stageNumber, @Nullable final Integer stageNumber,
final Throwable e final Throwable e
) )
{
return fromException(taskId, host, stageNumber, e, null);
}
public static MSQErrorReport fromException(
final String taskId,
@Nullable final String host,
@Nullable final Integer stageNumber,
final Throwable e,
@Nullable final ColumnMappings columnMappings
)
{ {
return new MSQErrorReport( return new MSQErrorReport(
taskId, taskId,
host, host,
stageNumber, stageNumber,
getFaultFromException(e), getFaultFromException(e, columnMappings),
Throwables.getStackTraceAsString(e) Throwables.getStackTraceAsString(e)
); );
} }
@ -159,12 +172,17 @@ public class MSQErrorReport
'}'; '}';
} }
public static MSQFault getFaultFromException(@Nullable final Throwable e)
{
return getFaultFromException(e, null);
}
/** /**
* Magical code that extracts a useful fault from an exception, even if that exception is not necessarily a * Magical code that extracts a useful fault from an exception, even if that exception is not necessarily a
* {@link MSQException}. This method walks through the causal chain, and also "knows" about various exception * {@link MSQException}. This method walks through the causal chain, and also "knows" about various exception
* types thrown by other Druid code. * types thrown by other Druid code.
*/ */
public static MSQFault getFaultFromException(@Nullable final Throwable e) public static MSQFault getFaultFromException(@Nullable final Throwable e, @Nullable final ColumnMappings columnMappings)
{ {
// Unwrap exception wrappers to find an underlying fault. The assumption here is that the topmost recognizable // Unwrap exception wrappers to find an underlying fault. The assumption here is that the topmost recognizable
// exception should be used to generate the fault code for the entire report. // exception should be used to generate the fault code for the entire report.
@ -195,10 +213,21 @@ public class MSQErrorReport
return new RowTooLargeFault(((FrameRowTooLargeException) cause).getMaxFrameSize()); return new RowTooLargeFault(((FrameRowTooLargeException) cause).getMaxFrameSize());
} else if (cause instanceof InvalidNullByteException) { } else if (cause instanceof InvalidNullByteException) {
InvalidNullByteException invalidNullByteException = (InvalidNullByteException) cause; InvalidNullByteException invalidNullByteException = (InvalidNullByteException) cause;
String columnName = invalidNullByteException.getColumn();
if (columnMappings != null) {
IntList outputColumnsForQueryColumn = columnMappings.getOutputColumnsForQueryColumn(columnName);
// outputColumnsForQueryColumn.size should always be 1 due to hasUniqueOutputColumnNames check that is done
if (outputColumnsForQueryColumn.size() >= 1) {
int outputColumn = outputColumnsForQueryColumn.getInt(0);
columnName = columnMappings.getOutputColumnName(outputColumn);
}
}
return new InvalidNullByteFault( return new InvalidNullByteFault(
invalidNullByteException.getSource(), invalidNullByteException.getSource(),
invalidNullByteException.getRowNumber(), invalidNullByteException.getRowNumber(),
invalidNullByteException.getColumn(), columnName,
invalidNullByteException.getValue(), invalidNullByteException.getValue(),
invalidNullByteException.getPosition() invalidNullByteException.getPosition()
); );

View File

@ -43,7 +43,10 @@ public class ParseExceptionUtils
public static String generateReadableInputSourceNameFromMappedSegment(Segment segment) public static String generateReadableInputSourceNameFromMappedSegment(Segment segment)
{ {
if (segment instanceof ExternalSegment) { if (segment instanceof ExternalSegment) {
return StringUtils.format("external input source: %s", ((ExternalSegment) segment).externalInputSource().toString()); return StringUtils.format(
"external input source: %s",
((ExternalSegment) segment).externalInputSource().toString()
);
} else if (segment instanceof LookupSegment) { } else if (segment instanceof LookupSegment) {
return StringUtils.format("lookup input source: %s", segment.getId().getDataSource()); return StringUtils.format("lookup input source: %s", segment.getId().getDataSource());
} else if (segment instanceof QueryableIndexSegment) { } else if (segment instanceof QueryableIndexSegment) {

View File

@ -103,6 +103,44 @@ public class MSQParseExceptionsTest extends MSQTestBase
.verifyResults(); .verifyResults();
} }
@Test
public void testIngestWithNullByteInSqlExpression()
{
RowSignature rowSignature = RowSignature.builder()
.add("desc", ColumnType.STRING)
.add("text", ColumnType.STRING)
.build();
testIngestQuery()
.setSql(""
+ "WITH \"ext\" AS (SELECT *\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"{\\\"desc\\\":\\\"Row with NULL\\\",\\\"text\\\":\\\"There is a null in\\\\u0000 here somewhere\\\"}\\n\"}',\n"
+ " '{\"type\":\"json\"}'\n"
+ " )\n"
+ ") EXTEND (\"desc\" VARCHAR, \"text\" VARCHAR))\n"
+ "SELECT\n"
+ " \"desc\",\n"
+ " REPLACE(\"text\", 'a', 'A') AS \"text\"\n"
+ "FROM \"ext\"\n"
+ "")
.setExpectedRowSignature(rowSignature)
.setExpectedDataSource("foo1")
.setExpectedMSQFault(
new InvalidNullByteFault(
"external input source: InlineInputSource{data='{\"desc\":\"Row with NULL\",\"text\":\"There is a null in\\u0000 here somewhere\"}\n'}",
1,
"text",
"There is A null in\u0000 here somewhere",
18
)
)
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.verifyResults();
}
@Test @Test
public void testIngestWithSanitizedNullByte() throws IOException public void testIngestWithSanitizedNullByte() throws IOException
{ {