From f9f734cde5b6dd6e540b189645de0eaf6f18a091 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 24 Aug 2023 04:24:41 +0000 Subject: [PATCH] Display the output column name in InvalidNullByteException (#14780) This PR maps the query column to the output column name while surfacing the fault since that is readily visible to the user while executing the query. --- .../apache/druid/msq/exec/ControllerImpl.java | 50 +++++++++++++++++-- .../indexing/error/InvalidNullByteFault.java | 3 +- .../msq/indexing/error/MSQErrorReport.java | 35 +++++++++++-- .../druid/msq/input/ParseExceptionUtils.java | 5 +- .../msq/exec/MSQParseExceptionsTest.java | 38 ++++++++++++++ 5 files changed, 123 insertions(+), 8 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 19733dea0ac..d883a587e9b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -55,6 +55,7 @@ import org.apache.druid.frame.key.RowKey; import org.apache.druid.frame.key.RowKeyReader; import org.apache.druid.frame.processor.FrameProcessorExecutor; import org.apache.druid.frame.util.DurableStorageUtils; +import org.apache.druid.frame.write.InvalidNullByteException; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.LockGranularity; @@ -104,6 +105,7 @@ import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault; import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault; import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault; +import org.apache.druid.msq.indexing.error.InvalidNullByteFault; import org.apache.druid.msq.indexing.error.MSQErrorReport; import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.MSQFault; @@ -412,9 +414,15 @@ public class ControllerImpl implements Controller final String selfHost = MSQTasks.getHostFromSelfNode(selfDruidNode); final MSQErrorReport controllerError = exceptionEncountered != null - ? MSQErrorReport.fromException(id(), selfHost, null, exceptionEncountered) + ? MSQErrorReport.fromException( + id(), + selfHost, + null, + exceptionEncountered, + task.getQuerySpec().getColumnMappings() + ) : null; - final MSQErrorReport workerError = workerErrorRef.get(); + MSQErrorReport workerError = workerErrorRef.get(); taskStateForReport = TaskState.FAILED; errorForReport = MSQTasks.makeErrorReport(id(), selfHost, controllerError, workerError); @@ -748,7 +756,10 @@ public class ControllerImpl implements Controller !workerTaskLauncher.isTaskLatest(errorReport.getTaskId())) { log.info("Ignoring task %s", errorReport.getTaskId()); } else { - workerErrorRef.compareAndSet(null, errorReport); + workerErrorRef.compareAndSet( + null, + mapQueryColumnNameToOutputColumnName(errorReport) + ); } } @@ -2651,6 +2662,39 @@ public class ControllerImpl implements Controller return mergeMode; } + /** + * Maps the query column names (used internally while generating the query plan) to output column names (the one used + * by the user in the SQL query) for certain errors reported by workers (where they have limited knowledge of the + * ColumnMappings). For remaining errors not relying on the query column names, it returns it as is. + */ + @Nullable + private MSQErrorReport mapQueryColumnNameToOutputColumnName( + @Nullable final MSQErrorReport workerErrorReport + ) + { + + if (workerErrorReport == null) { + return null; + } else if (workerErrorReport.getFault() instanceof InvalidNullByteFault) { + InvalidNullByteFault inbf = (InvalidNullByteFault) workerErrorReport.getFault(); + return MSQErrorReport.fromException( + workerErrorReport.getTaskId(), + workerErrorReport.getHost(), + workerErrorReport.getStageNumber(), + InvalidNullByteException.builder() + .source(inbf.getSource()) + .rowNumber(inbf.getRowNumber()) + .column(inbf.getColumn()) + .value(inbf.getValue()) + .position(inbf.getPosition()) + .build(), + task.getQuerySpec().getColumnMappings() + ); + } else { + return workerErrorReport; + } + } + /** * Interface used by {@link #contactWorkersForStage}. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java index 3b2a1881fa1..0e85e6817d4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InvalidNullByteFault.java @@ -63,7 +63,8 @@ public class InvalidNullByteFault extends BaseMSQFault { super( CODE, - "Invalid null byte at source [%s], rowNumber [%d], column[%s], value[%s], position[%d]. Consider sanitizing the string using REPLACE(\"%s\", U&'\\0000', '') AS %s", + "Invalid null byte at source [%s], rowNumber [%d], column[%s], value[%s], position[%d]. " + + "Consider sanitizing the input string column using REPLACE(\"%s\", U&'\\0000', '') AS %s", source, rowNumber, column, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java index 09d7c258ec0..ffad2c0c80f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQErrorReport.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import it.unimi.dsi.fastutil.ints.IntList; import org.apache.druid.frame.processor.FrameRowTooLargeException; import org.apache.druid.frame.write.InvalidNullByteException; import org.apache.druid.frame.write.UnsupportedColumnTypeException; @@ -31,6 +32,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.msq.statistics.TooManyBucketsException; import org.apache.druid.query.groupby.epinephelinae.UnexpectedMultiValueDimensionException; +import org.apache.druid.sql.calcite.planner.ColumnMappings; import javax.annotation.Nullable; import java.util.Objects; @@ -78,12 +80,23 @@ public class MSQErrorReport @Nullable final Integer stageNumber, final Throwable e ) + { + return fromException(taskId, host, stageNumber, e, null); + } + + public static MSQErrorReport fromException( + final String taskId, + @Nullable final String host, + @Nullable final Integer stageNumber, + final Throwable e, + @Nullable final ColumnMappings columnMappings + ) { return new MSQErrorReport( taskId, host, stageNumber, - getFaultFromException(e), + getFaultFromException(e, columnMappings), Throwables.getStackTraceAsString(e) ); } @@ -159,12 +172,17 @@ public class MSQErrorReport '}'; } + public static MSQFault getFaultFromException(@Nullable final Throwable e) + { + return getFaultFromException(e, null); + } + /** * Magical code that extracts a useful fault from an exception, even if that exception is not necessarily a * {@link MSQException}. This method walks through the causal chain, and also "knows" about various exception * types thrown by other Druid code. */ - public static MSQFault getFaultFromException(@Nullable final Throwable e) + public static MSQFault getFaultFromException(@Nullable final Throwable e, @Nullable final ColumnMappings columnMappings) { // Unwrap exception wrappers to find an underlying fault. The assumption here is that the topmost recognizable // exception should be used to generate the fault code for the entire report. @@ -195,10 +213,21 @@ public class MSQErrorReport return new RowTooLargeFault(((FrameRowTooLargeException) cause).getMaxFrameSize()); } else if (cause instanceof InvalidNullByteException) { InvalidNullByteException invalidNullByteException = (InvalidNullByteException) cause; + String columnName = invalidNullByteException.getColumn(); + if (columnMappings != null) { + IntList outputColumnsForQueryColumn = columnMappings.getOutputColumnsForQueryColumn(columnName); + + // outputColumnsForQueryColumn.size should always be 1 due to hasUniqueOutputColumnNames check that is done + if (outputColumnsForQueryColumn.size() >= 1) { + int outputColumn = outputColumnsForQueryColumn.getInt(0); + columnName = columnMappings.getOutputColumnName(outputColumn); + } + } + return new InvalidNullByteFault( invalidNullByteException.getSource(), invalidNullByteException.getRowNumber(), - invalidNullByteException.getColumn(), + columnName, invalidNullByteException.getValue(), invalidNullByteException.getPosition() ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java index bfc1769868c..0911059b364 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/ParseExceptionUtils.java @@ -43,7 +43,10 @@ public class ParseExceptionUtils public static String generateReadableInputSourceNameFromMappedSegment(Segment segment) { if (segment instanceof ExternalSegment) { - return StringUtils.format("external input source: %s", ((ExternalSegment) segment).externalInputSource().toString()); + return StringUtils.format( + "external input source: %s", + ((ExternalSegment) segment).externalInputSource().toString() + ); } else if (segment instanceof LookupSegment) { return StringUtils.format("lookup input source: %s", segment.getId().getDataSource()); } else if (segment instanceof QueryableIndexSegment) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java index 3f4c953bcb5..61a89f0d6fc 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java @@ -103,6 +103,44 @@ public class MSQParseExceptionsTest extends MSQTestBase .verifyResults(); } + @Test + public void testIngestWithNullByteInSqlExpression() + { + + RowSignature rowSignature = RowSignature.builder() + .add("desc", ColumnType.STRING) + .add("text", ColumnType.STRING) + .build(); + + testIngestQuery() + .setSql("" + + "WITH \"ext\" AS (SELECT *\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"inline\",\"data\":\"{\\\"desc\\\":\\\"Row with NULL\\\",\\\"text\\\":\\\"There is a null in\\\\u0000 here somewhere\\\"}\\n\"}',\n" + + " '{\"type\":\"json\"}'\n" + + " )\n" + + ") EXTEND (\"desc\" VARCHAR, \"text\" VARCHAR))\n" + + "SELECT\n" + + " \"desc\",\n" + + " REPLACE(\"text\", 'a', 'A') AS \"text\"\n" + + "FROM \"ext\"\n" + + "") + .setExpectedRowSignature(rowSignature) + .setExpectedDataSource("foo1") + .setExpectedMSQFault( + new InvalidNullByteFault( + "external input source: InlineInputSource{data='{\"desc\":\"Row with NULL\",\"text\":\"There is a null in\\u0000 here somewhere\"}\n'}", + 1, + "text", + "There is A null in\u0000 here somewhere", + 18 + ) + ) + .setQueryContext(DEFAULT_MSQ_CONTEXT) + .verifyResults(); + } + @Test public void testIngestWithSanitizedNullByte() throws IOException {