Add validation for reindex with realtime sources (#16390)

Add validation for reindex with realtime sources.

With the addition of concurrent compaction, it is possible to ingest data while querying from realtime sources with MSQ into the same datasource. This could potentially lead to issues if the interval that is ingested into is replaced by an MSQ job, which has queried only some of the data from the realtime task.

This PR adds validation to check that the datasource being ingested into is not being queried from, if the query includes realtime sources.
This commit is contained in:
Adarsh Sanjeev 2024-05-07 10:32:15 +05:30 committed by GitHub
parent b5958b6b07
commit 269e035e76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 63 additions and 2 deletions

View File

@ -405,7 +405,7 @@ The following table lists the context parameters for the MSQ task engine:
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` | | `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of the select query is written. <br />Use `taskReport`(the default) to write select results to the task report. <b> This is not scalable since task reports size explodes for large results </b> <br/>Use `durableStorage` to write results to durable storage location. <b>For large results sets, its recommended to use `durableStorage` </b>. To configure durable storage see [`this`](#durable-storage) section. | `taskReport` | | `selectDestination` | SELECT<br /><br /> Controls where the final result of the select query is written. <br />Use `taskReport`(the default) to write select results to the task report. <b> This is not scalable since task reports size explodes for large results </b> <br/>Use `durableStorage` to write results to durable storage location. <b>For large results sets, its recommended to use `durableStorage` </b>. To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |
| `waitUntilSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest query waits for the generated segment to be loaded before exiting, else the ingest query exits without waiting. The task and live reports contain the information about the status of loading segments if this flag is set. This will ensure that any future queries made after the ingestion exits will include results from the ingestion. The drawback is that the controller task will stall till the segments are loaded. | `false` | | `waitUntilSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest query waits for the generated segment to be loaded before exiting, else the ingest query exits without waiting. The task and live reports contain the information about the status of loading segments if this flag is set. This will ensure that any future queries made after the ingestion exits will include results from the ingestion. The drawback is that the controller task will stall till the segments are loaded. | `false` |
| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the sources, which will be queried for results in addition to the segments present on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only non-realtime (published and used) segments will be downloaded from deep storage. If this value is `REALTIME`, results will also be included from realtime tasks. | `NONE` | | `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the sources, which will be queried for results in addition to the segments present on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only non-realtime (published and used) segments will be downloaded from deep storage. If this value is `REALTIME`, results will also be included from realtime tasks. `REALTIME` cannot be used while writing data into the same datasource it is read from.| `NONE` |
| `rowsPerPage` | SELECT<br /><br />The number of rows per page to target. The actual number of rows per page may be somewhat higher or lower than this number. In most cases, use the default.<br /> This property comes into effect only when `selectDestination` is set to `durableStorage` | 100000 | | `rowsPerPage` | SELECT<br /><br />The number of rows per page to target. The actual number of rows per page may be somewhat higher or lower than this number. In most cases, use the default.<br /> This property comes into effect only when `selectDestination` is set to `durableStorage` | 100000 |
| `skipTypeVerification` | INSERT or REPLACE<br /><br />During query validation, Druid validates that [string arrays](../querying/arrays.md) and [multi-value dimensions](../querying/multi-value-dimensions.md) are not mixed in the same column. If you are intentionally migrating from one to the other, use this context parameter to disable type validation.<br /><br />Provide the column list as comma-separated values or as a JSON array in string form.| empty list | | `skipTypeVerification` | INSERT or REPLACE<br /><br />During query validation, Druid validates that [string arrays](../querying/arrays.md) and [multi-value dimensions](../querying/multi-value-dimensions.md) are not mixed in the same column. If you are intentionally migrating from one to the other, use this context parameter to disable type validation.<br /><br />Provide the column list as comma-separated values or as a JSON array in string form.| empty list |
| `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the default), an INSERT query generating no output rows will be no-op, and a REPLACE query generating no output rows will delete all data that matches the OVERWRITE clause. When set to true, an ingest query generating no output rows will throw an `InsertCannotBeEmpty` fault. | `false` | | `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the default), an INSERT query generating no output rows will be no-op, and a REPLACE query generating no output rows will delete all data that matches the OVERWRITE clause. When set to true, an ingest query generating no output rows will throw an `InsertCannotBeEmpty` fault. | `false` |

View File

@ -171,6 +171,7 @@ import org.apache.druid.msq.util.ArrayIngestMode;
import org.apache.druid.msq.util.DimensionSchemaUtils; import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.msq.util.IntervalUtils; import org.apache.druid.msq.util.IntervalUtils;
import org.apache.druid.msq.util.MSQFutureUtils; import org.apache.druid.msq.util.MSQFutureUtils;
import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.msq.util.PassthroughAggregatorFactory; import org.apache.druid.msq.util.PassthroughAggregatorFactory;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
@ -1691,6 +1692,8 @@ public class ControllerImpl implements Controller
throw new ISE("Column names are not unique: [%s]", columnMappings.getOutputColumnNames()); throw new ISE("Column names are not unique: [%s]", columnMappings.getOutputColumnNames());
} }
MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME)) { if (columnMappings.hasOutputColumn(ColumnHolder.TIME_COLUMN_NAME)) {
// We know there's a single time column, because we've checked columnMappings.hasUniqueOutputColumnNames(). // We know there's a single time column, because we've checked columnMappings.hasUniqueOutputColumnNames().
final int timeColumn = columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME).getInt(0); final int timeColumn = columnMappings.getOutputColumnsByName(ColumnHolder.TIME_COLUMN_NAME).getInt(0);

View File

@ -315,7 +315,7 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery,
} }
/** /**
* Returns true if the task reads from the same table as the destionation. In this case, we would prefer to fail * Returns true if the task reads from the same table as the destination. In this case, we would prefer to fail
* instead of reading any unused segments to ensure that old data is not read. * instead of reading any unused segments to ensure that old data is not read.
*/ */
public static boolean isReplaceInputDataSourceTask(MSQSpec querySpec) public static boolean isReplaceInputDataSourceTask(MSQSpec querySpec)

View File

@ -281,6 +281,8 @@ public class MSQTaskQueryMaker implements QueryMaker
.tuningConfig(new MSQTuningConfig(maxNumWorkers, maxRowsInMemory, rowsPerSegment, indexSpec)) .tuningConfig(new MSQTuningConfig(maxNumWorkers, maxRowsInMemory, rowsPerSegment, indexSpec))
.build(); .build();
MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
final MSQControllerTask controllerTask = new MSQControllerTask( final MSQControllerTask controllerTask = new MSQControllerTask(
taskId, taskId,
querySpec.withOverriddenContext(nativeQueryContext), querySpec.withOverriddenContext(nativeQueryContext),

View File

@ -20,8 +20,13 @@
package org.apache.druid.msq.util; package org.apache.druid.msq.util;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import java.util.Collection; import java.util.Collection;
@ -90,4 +95,23 @@ public class MSQTaskQueryMakerUtils
throw new IAE("Segment sort order must begin with column [%s]", ColumnHolder.TIME_COLUMN_NAME); throw new IAE("Segment sort order must begin with column [%s]", ColumnHolder.TIME_COLUMN_NAME);
} }
} }
/**
* Validates that a query does not read from a datasource that it is ingesting data into, if realtime segments are
* being queried.
*/
public static void validateRealtimeReindex(final MSQSpec querySpec)
{
final SegmentSource segmentSources = MultiStageQueryContext.getSegmentSources(querySpec.getQuery().context());
if (MSQControllerTask.isReplaceInputDataSourceTask(querySpec) && SegmentSource.REALTIME.equals(segmentSources)) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build("Cannot ingest into datasource[%s] since it is also being queried from, with "
+ "REALTIME segments included. Ingest to a different datasource, or disable querying "
+ "of realtime segments by modifying [%s] in the query context.",
((DataSourceMSQDestination) querySpec.getDestination()).getDataSource(),
MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE
);
}
}
} }

View File

@ -29,6 +29,8 @@ import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec;
@ -57,6 +59,7 @@ import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
@ -1840,6 +1843,35 @@ public class MSQReplaceTest extends MSQTestBase
.verifyResults(); .verifyResults();
} }
@Test
void testRealtimeQueryWithReindexShouldThrowException()
{
Map<String, Object> context = ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.put(MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE, SegmentSource.REALTIME.name())
.build();
testIngestQuery().setSql(
"REPLACE INTO foo"
+ " OVERWRITE ALL"
+ " SELECT *"
+ " FROM foo"
+ " PARTITIONED BY DAY")
.setQueryContext(context)
.setExpectedValidationErrorMatcher(
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"general"
).expectMessageContains(
"Cannot ingest into datasource[foo] since it is also being queried from, with REALTIME "
+ "segments included. Ingest to a different datasource, or disable querying of realtime "
+ "segments by modifying [includeSegmentSource] in the query context.")
)
.verifyPlanningErrors();
}
@Nonnull @Nonnull
private Set<SegmentId> expectedFooSegments() private Set<SegmentId> expectedFooSegments()
{ {