From f546cd64a9640ed14c9c0f44481678d39530e3b5 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 27 Jun 2023 09:25:32 +0530 Subject: [PATCH] MSQ: Ensure that the allocated segment aligns with the requested granularity (#14475) Changes: - Throw an `InsertCannotAllocateSegmentFault` if the allocated segment is not aligned with the requested granularity. - Tests to verify new behaviour --- docs/multi-stage-query/reference.md | 2 +- .../apache/druid/msq/exec/ControllerImpl.java | 17 +++- .../InsertCannotAllocateSegmentFault.java | 70 ++++++++++++- .../apache/druid/msq/util/IntervalUtils.java | 22 ++++- .../apache/druid/msq/exec/MSQFaultsTest.java | 54 +++++++++- .../msq/indexing/error/MSQFaultSerdeTest.java | 7 +- .../druid/msq/util/IntervalUtilsTest.java | 99 +++++++++++++++++++ .../util/common/granularity/Granularity.java | 3 +- 8 files changed, 262 insertions(+), 12 deletions(-) diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index d56d86964f1..ec6a5b1543d 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -423,7 +423,7 @@ The following table describes error codes you may encounter in the `multiStageQu | `CannotParseExternalData` | A worker task could not parse data from an external datasource. | `errorMessage`: More details on why parsing failed. | | `ColumnNameRestricted` | The query uses a restricted column name. | `columnName`: The restricted column name. | | `ColumnTypeNotSupported` | The column type is not supported. This can be because:

| `columnName`: The column name with an unsupported type.

`columnType`: The unknown column type. | -| `InsertCannotAllocateSegment` | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:

| `dataSource`

`interval`: The interval for the attempted new segment allocation. | +| `InsertCannotAllocateSegment` | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:



Use REPLACE to overwrite the existing data or if the error contains the `allocatedInterval` then alternatively rerun the INSERT job with the mentioned granularity to append to existing data. Note that it might not always be possible to append to the existing data using INSERT and can only be done if `allocatedInterval` is present. | `dataSource`

`interval`: The interval for the attempted new segment allocation.

`allocatedInterval`: The incorrect interval allocated by the overlord. It can be null | | `InsertCannotBeEmpty` | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` | | `InsertLockPreempted` | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | | | `InsertTimeNull` | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.

This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. ([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern. Or, if your timestamps may genuinely be null, consider using [`COALESCE`](../querying/sql-scalar.md#other-scalar-functions) to provide a default value. One option is [`CURRENT_TIMESTAMP`](../querying/sql-scalar.md#date-and-time-functions), which represents the start time of the job.| diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index dd163d406d3..db5e6997179 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -940,7 +940,22 @@ public class ControllerImpl implements Controller throw new MSQException( new InsertCannotAllocateSegmentFault( task.getDataSource(), - segmentGranularity.bucket(timestamp) + segmentGranularity.bucket(timestamp), + null + ) + ); + } + + // Even if allocation isn't null, the overlord makes the best effort job of allocating a segment with the given + // segmentGranularity. This is commonly seen in case when there is already a coarser segment in the interval where + // the requested segment is present and that segment completely overlaps the request. Throw an error if the interval + // doesn't match the granularity requested + if (!IntervalUtils.isAligned(allocation.getInterval(), segmentGranularity)) { + throw new MSQException( + new InsertCannotAllocateSegmentFault( + task.getDataSource(), + segmentGranularity.bucket(timestamp), + allocation.getInterval() ) ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java index 403af37d9bf..f632ae67736 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertCannotAllocateSegmentFault.java @@ -23,8 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.DurationGranularity; +import org.apache.druid.java.util.common.granularity.GranularityType; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.Objects; @JsonTypeName(InsertCannotAllocateSegmentFault.CODE) @@ -35,15 +39,20 @@ public class InsertCannotAllocateSegmentFault extends BaseMSQFault private final String dataSource; private final Interval interval; + @Nullable + private final Interval allocatedInterval; + @JsonCreator public InsertCannotAllocateSegmentFault( @JsonProperty("dataSource") final String dataSource, - @JsonProperty("interval") final Interval interval + @JsonProperty("interval") final Interval interval, + @Nullable @JsonProperty("allocatedInterval") final Interval allocatedInterval ) { - super(CODE, "Cannot allocate segment for dataSource [%s], interval [%s]", dataSource, interval); + super(CODE, getErrorMessage(dataSource, interval, allocatedInterval)); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.interval = Preconditions.checkNotNull(interval, "interval"); + this.allocatedInterval = allocatedInterval; } @JsonProperty @@ -58,6 +67,57 @@ public class InsertCannotAllocateSegmentFault extends BaseMSQFault return interval; } + @Nullable + @JsonProperty + public Interval getAllocatedInterval() + { + return allocatedInterval; + } + + private static String getErrorMessage( + final String dataSource, + final Interval interval, + @Nullable final Interval allocatedInterval + ) + { + String errorMessage; + if (allocatedInterval == null) { + errorMessage = StringUtils.format( + "Cannot allocate segment for dataSource [%s], interval [%s]. This can happen if the prior ingestion " + + "uses non-extendable shard specs or if the partitioned by granularity is different from the granularity of the " + + "pre-existing segments. Check the granularities of the pre-existing segments or re-run the ingestion with REPLACE " + + "to overwrite over the existing data", + dataSource, + interval + ); + } else { + errorMessage = StringUtils.format( + "Requested segment for dataSource [%s], interval [%s], but got [%s] interval instead. " + + "This happens when an overlapping segment is already present with a coarser granularity for the requested interval. " + + "Either set the partition granularity for the INSERT to [%s] to append to existing data or use REPLACE to " + + "overwrite over the pre-existing segment", + dataSource, + interval, + allocatedInterval, + convertIntervalToGranularityString(allocatedInterval) + ); + } + return errorMessage; + } + + /** + * Converts the given interval to a string representing the granularity which is more user-friendly. + */ + private static String convertIntervalToGranularityString(final Interval interval) + { + try { + return GranularityType.fromPeriod(interval.toPeriod()).name(); + } + catch (Exception e) { + return new DurationGranularity(interval.toDurationMillis(), null).toString(); + } + } + @Override public boolean equals(Object o) { @@ -71,12 +131,14 @@ public class InsertCannotAllocateSegmentFault extends BaseMSQFault return false; } InsertCannotAllocateSegmentFault that = (InsertCannotAllocateSegmentFault) o; - return Objects.equals(dataSource, that.dataSource) && Objects.equals(interval, that.interval); + return Objects.equals(dataSource, that.dataSource) + && Objects.equals(interval, that.interval) + && Objects.equals(allocatedInterval, that.allocatedInterval); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), dataSource, interval); + return Objects.hash(super.hashCode(), dataSource, interval, allocatedInterval); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java index 43a844b5a6f..328a0c0b74d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/IntervalUtils.java @@ -19,13 +19,16 @@ package org.apache.druid.msq.util; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.java.util.common.granularity.Granularity; import org.joda.time.Interval; import java.util.ArrayList; import java.util.List; /** - * Things that would make sense in {@link org.apache.druid.java.util.common.Intervals} if this were not an extension. + * Things that would make sense in {@link Intervals} if this were not an extension. */ public class IntervalUtils { @@ -61,4 +64,21 @@ public class IntervalUtils return retVal; } + + /** + * This method checks if the provided interval is aligned by the granularity or is an instance of {@link Intervals#ETERNITY} + * This is used to check if the granularity allocation made by the overlord is the same as the one requested in the + * SQL query + */ + public static boolean isAligned( + final Interval interval, + final Granularity granularity + ) + { + // AllGranularity needs special handling since AllGranularity#bucketStart always returns false + if (granularity instanceof AllGranularity) { + return Intervals.isEternity(interval); + } + return granularity.isAligned(interval); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index 646286acaf5..cfcc1073331 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -36,6 +36,8 @@ import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.junit.Test; import org.mockito.Mockito; @@ -51,7 +53,7 @@ import static org.mockito.ArgumentMatchers.isA; public class MSQFaultsTest extends MSQTestBase { @Test - public void testInsertCannotAllocateSegmentFault() + public void testInsertCannotAllocateSegmentFaultWhenNullAllocation() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -62,18 +64,64 @@ public class MSQFaultsTest extends MSQTestBase Mockito.doReturn(null).when(testTaskActionClient).submit(isA(SegmentAllocateAction.class)); testIngestQuery().setSql( - "insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time >= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1") + "insert into foo1" + + " select __time, dim1 , count(*) as cnt" + + " from foo" + + " where dim1 is not null and __time >= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'" + + " group by 1, 2" + + " PARTITIONED by day" + + " clustered by dim1" + ) .setExpectedDataSource("foo1") .setExpectedRowSignature(rowSignature) .setExpectedMSQFault( new InsertCannotAllocateSegmentFault( "foo1", - Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z") + Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"), + null ) ) .verifyResults(); } + @Test + public void testInsertCannotAllocateSegmentFaultWhenInvalidAllocation() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG).build(); + + // If there is some problem allocating the segment,task action client will return a null value. + Mockito.doReturn(new SegmentIdWithShardSpec( + "foo1", + Intervals.of("2000-01-01/2000-02-01"), + "test", + new LinearShardSpec(2) + )).when(testTaskActionClient).submit(isA(SegmentAllocateAction.class)); + + testIngestQuery().setSql( + "insert into foo1" + + " select __time, dim1 , count(*) as cnt" + + " from foo" + + " where dim1 is not null and __time >= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'" + + " group by 1, 2" + + " PARTITIONED by day" + + " clustered by dim1" + ) + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .setExpectedMSQFault( + new InsertCannotAllocateSegmentFault( + "foo1", + Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"), + Intervals.of("2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z") + ) + ) + .verifyResults(); + } + + @Test public void testInsertCannotBeEmptyFault() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java index c2ebb2d25ac..bcd0de1ef68 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java @@ -54,7 +54,12 @@ public class MSQFaultSerdeTest assertFaultSerde(new ColumnTypeNotSupportedFault("the column", null)); assertFaultSerde(new ColumnTypeNotSupportedFault("the column", ColumnType.STRING_ARRAY)); assertFaultSerde(new ColumnNameRestrictedFault("the column")); - assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY)); + assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY, null)); + assertFaultSerde(new InsertCannotAllocateSegmentFault( + "the datasource", + Intervals.of("2000-01-01/2002-01-01"), + Intervals.ETERNITY + )); assertFaultSerde(new InsertCannotBeEmptyFault("the datasource")); assertFaultSerde(InsertLockPreemptedFault.INSTANCE); assertFaultSerde(InsertTimeNullFault.INSTANCE); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java index 6dd47313e7b..03371f8b90f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/IntervalUtilsTest.java @@ -19,8 +19,12 @@ package org.apache.druid.msq.util; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.joda.time.Interval; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; @@ -84,6 +88,101 @@ public class IntervalUtilsTest ); } + @Test + public void test_doesIntervalMatchesGranularity_withStandardGranularities() + { + + Assert.assertTrue( + IntervalUtils.isAligned(Intervals.ETERNITY, Granularities.ALL) + ); + + Assert.assertTrue( + IntervalUtils.isAligned( + Intervals.of("2000-01-01/2001-01-01"), Granularities.YEAR + ) + ); + + Assert.assertTrue( + IntervalUtils.isAligned( + Intervals.of("2000-01-01/2000-04-01"), Granularities.QUARTER + ) + ); + + + Assert.assertTrue( + IntervalUtils.isAligned( + Intervals.of("2000-01-01/2000-02-01"), Granularities.MONTH + ) + ); + + // With the way WEEK granularities work, this needs to be aligned to an actual week + Assert.assertTrue( + IntervalUtils.isAligned( + Intervals.of("1999-12-27/2000-01-03"), Granularities.WEEK + ) + ); + + Assert.assertTrue( + IntervalUtils.isAligned( + Intervals.of("2000-01-01/2000-01-02"), Granularities.DAY + ) + ); + + Assert.assertTrue( + IntervalUtils.isAligned( + Intervals.of("2000-01-01T00:00:00.000/2000-01-01T08:00:00.000"), Granularities.EIGHT_HOUR + ) + ); + + Assert.assertTrue( + IntervalUtils.isAligned( + Intervals.of("2000-01-01T00:00:00.000/2000-01-01T01:00:00.000"), Granularities.HOUR + ) + ); + + Assert.assertTrue( + IntervalUtils.isAligned( + Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:01:00.000"), Granularities.MINUTE + ) + ); + + Assert.assertTrue( + IntervalUtils.isAligned( + Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:00:01.000"), Granularities.SECOND + ) + ); + + Assert.assertFalse( + IntervalUtils.isAligned( + Intervals.of("2000-01-01/2002-01-01"), Granularities.YEAR + ) + ); + + Assert.assertFalse( + IntervalUtils.isAligned( + Intervals.of("2000-01-01/2002-01-08"), Granularities.YEAR + ) + ); + } + + @Test + public void test_doesIntervalMatchesGranularity_withPeriodGranularity() + { + Assert.assertTrue( + IntervalUtils.isAligned( + Intervals.of("2000-01-01/2000-01-04"), + new PeriodGranularity(new Period("P3D"), DateTimes.of("2000-01-01"), null) + ) + ); + + Assert.assertFalse( + IntervalUtils.isAligned( + Intervals.of("2000-01-01/2000-01-04"), + new PeriodGranularity(new Period("P3D"), DateTimes.of("2000-01-02"), null) + ) + ); + } + public static List intervals(final String... intervals) { return Arrays.stream(intervals).map(Intervals::of).collect(Collectors.toList()); diff --git a/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java b/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java index 812538ae6e0..6f5d9fb61e1 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/granularity/Granularity.java @@ -145,7 +145,8 @@ public abstract class Granularity implements Cacheable public abstract DateTime toDate(String filePath, Formatter formatter); /** - * Return true if time chunks populated by this granularity includes the given interval time chunk. + * Return true only if the time chunks populated by this granularity includes the given interval time chunk. The + * interval must fit exactly into the scheme of the granularity for this to return true */ public abstract boolean isAligned(Interval interval);