MSQ: Ensure that the allocated segment aligns with the requested granularity (#14475)

Changes:
- Throw an `InsertCannotAllocateSegmentFault` if the allocated segment is not aligned with
the requested granularity.
- Tests to verify new behaviour
This commit is contained in:
Laksh Singla 2023-06-27 09:25:32 +05:30 committed by GitHub
parent 903addf7c2
commit f546cd64a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 262 additions and 12 deletions

View File

@ -423,7 +423,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_CannotParseExternalData">`CannotParseExternalData`</a> | A worker task could not parse data from an external datasource. | `errorMessage`: More details on why parsing failed. |
| <a name="error_ColumnNameRestricted">`ColumnNameRestricted`</a> | The query uses a restricted column name. | `columnName`: The restricted column name. |
| <a name="error_ColumnTypeNotSupported">`ColumnTypeNotSupported`</a> | The column type is not supported. This can be because:<br /> <br /><ul><li>Support for writing or reading from a particular column type is not supported.</li><li>The query attempted to use a column type that is not supported by the frame format. This occurs with ARRAY types, which are not yet implemented for frames.</li></ul> | `columnName`: The column name with an unsupported type.<br /> <br />`columnType`: The unknown column type. |
| <a name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:<br /> <br /><ul><li>Attempting to mix different granularities in the same intervals of the same datasource.</li><li>Prior ingestions that used non-extendable shard specs.</li></ul>| `dataSource`<br /> <br />`interval`: The interval for the attempted new segment allocation. |
| <a name="error_InsertCannotAllocateSegment">`InsertCannotAllocateSegment`</a> | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:<br /> <br /><ul><li>Attempting to mix different granularities in the same intervals of the same datasource.</li><li>Prior ingestions that used non-extendable shard specs.</li></ul> <br /> <br /> Use REPLACE to overwrite the existing data or if the error contains the `allocatedInterval` then alternatively rerun the INSERT job with the mentioned granularity to append to existing data. Note that it might not always be possible to append to the existing data using INSERT and can only be done if `allocatedInterval` is present. | `dataSource`<br /> <br />`interval`: The interval for the attempted new segment allocation. <br /> <br /> `allocatedInterval`: The incorrect interval allocated by the overlord. It can be null |
| <a name="error_InsertCannotBeEmpty">`InsertCannotBeEmpty`</a> | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` |
| <a name="error_InsertLockPreempted">`InsertLockPreempted`</a> | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | |
| <a name="error_InsertTimeNull">`InsertTimeNull`</a> | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.<br /><br />This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. ([`TIME_PARSE`](../querying/sql-scalar.md#date-and-time-functions) returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern. Or, if your timestamps may genuinely be null, consider using [`COALESCE`](../querying/sql-scalar.md#other-scalar-functions) to provide a default value. One option is [`CURRENT_TIMESTAMP`](../querying/sql-scalar.md#date-and-time-functions), which represents the start time of the job.|

View File

@ -940,7 +940,22 @@ public class ControllerImpl implements Controller
throw new MSQException(
new InsertCannotAllocateSegmentFault(
task.getDataSource(),
segmentGranularity.bucket(timestamp)
segmentGranularity.bucket(timestamp),
null
)
);
}
// Even if allocation isn't null, the overlord makes the best effort job of allocating a segment with the given
// segmentGranularity. This is commonly seen in case when there is already a coarser segment in the interval where
// the requested segment is present and that segment completely overlaps the request. Throw an error if the interval
// doesn't match the granularity requested
if (!IntervalUtils.isAligned(allocation.getInterval(), segmentGranularity)) {
throw new MSQException(
new InsertCannotAllocateSegmentFault(
task.getDataSource(),
segmentGranularity.bucket(timestamp),
allocation.getInterval()
)
);
}

View File

@ -23,8 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.DurationGranularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Objects;
@JsonTypeName(InsertCannotAllocateSegmentFault.CODE)
@ -35,15 +39,20 @@ public class InsertCannotAllocateSegmentFault extends BaseMSQFault
private final String dataSource;
private final Interval interval;
@Nullable
private final Interval allocatedInterval;
@JsonCreator
public InsertCannotAllocateSegmentFault(
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") final Interval interval
@JsonProperty("interval") final Interval interval,
@Nullable @JsonProperty("allocatedInterval") final Interval allocatedInterval
)
{
super(CODE, "Cannot allocate segment for dataSource [%s], interval [%s]", dataSource, interval);
super(CODE, getErrorMessage(dataSource, interval, allocatedInterval));
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Preconditions.checkNotNull(interval, "interval");
this.allocatedInterval = allocatedInterval;
}
@JsonProperty
@ -58,6 +67,57 @@ public class InsertCannotAllocateSegmentFault extends BaseMSQFault
return interval;
}
@Nullable
@JsonProperty
public Interval getAllocatedInterval()
{
return allocatedInterval;
}
private static String getErrorMessage(
final String dataSource,
final Interval interval,
@Nullable final Interval allocatedInterval
)
{
String errorMessage;
if (allocatedInterval == null) {
errorMessage = StringUtils.format(
"Cannot allocate segment for dataSource [%s], interval [%s]. This can happen if the prior ingestion "
+ "uses non-extendable shard specs or if the partitioned by granularity is different from the granularity of the "
+ "pre-existing segments. Check the granularities of the pre-existing segments or re-run the ingestion with REPLACE "
+ "to overwrite over the existing data",
dataSource,
interval
);
} else {
errorMessage = StringUtils.format(
"Requested segment for dataSource [%s], interval [%s], but got [%s] interval instead. "
+ "This happens when an overlapping segment is already present with a coarser granularity for the requested interval. "
+ "Either set the partition granularity for the INSERT to [%s] to append to existing data or use REPLACE to "
+ "overwrite over the pre-existing segment",
dataSource,
interval,
allocatedInterval,
convertIntervalToGranularityString(allocatedInterval)
);
}
return errorMessage;
}
/**
* Converts the given interval to a string representing the granularity which is more user-friendly.
*/
private static String convertIntervalToGranularityString(final Interval interval)
{
try {
return GranularityType.fromPeriod(interval.toPeriod()).name();
}
catch (Exception e) {
return new DurationGranularity(interval.toDurationMillis(), null).toString();
}
}
@Override
public boolean equals(Object o)
{
@ -71,12 +131,14 @@ public class InsertCannotAllocateSegmentFault extends BaseMSQFault
return false;
}
InsertCannotAllocateSegmentFault that = (InsertCannotAllocateSegmentFault) o;
return Objects.equals(dataSource, that.dataSource) && Objects.equals(interval, that.interval);
return Objects.equals(dataSource, that.dataSource)
&& Objects.equals(interval, that.interval)
&& Objects.equals(allocatedInterval, that.allocatedInterval);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), dataSource, interval);
return Objects.hash(super.hashCode(), dataSource, interval, allocatedInterval);
}
}

View File

@ -19,13 +19,16 @@
package org.apache.druid.msq.util;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.List;
/**
* Things that would make sense in {@link org.apache.druid.java.util.common.Intervals} if this were not an extension.
* Things that would make sense in {@link Intervals} if this were not an extension.
*/
public class IntervalUtils
{
@ -61,4 +64,21 @@ public class IntervalUtils
return retVal;
}
/**
* This method checks if the provided interval is aligned by the granularity or is an instance of {@link Intervals#ETERNITY}
* This is used to check if the granularity allocation made by the overlord is the same as the one requested in the
* SQL query
*/
public static boolean isAligned(
final Interval interval,
final Granularity granularity
)
{
// AllGranularity needs special handling since AllGranularity#bucketStart always returns false
if (granularity instanceof AllGranularity) {
return Intervals.isEternity(interval);
}
return granularity.isAligned(interval);
}
}

View File

@ -36,6 +36,8 @@ import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Test;
import org.mockito.Mockito;
@ -51,7 +53,7 @@ import static org.mockito.ArgumentMatchers.isA;
public class MSQFaultsTest extends MSQTestBase
{
@Test
public void testInsertCannotAllocateSegmentFault()
public void testInsertCannotAllocateSegmentFaultWhenNullAllocation()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@ -62,18 +64,64 @@ public class MSQFaultsTest extends MSQTestBase
Mockito.doReturn(null).when(testTaskActionClient).submit(isA(SegmentAllocateAction.class));
testIngestQuery().setSql(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null and __time >= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00' group by 1, 2 PARTITIONED by day clustered by dim1")
"insert into foo1"
+ " select __time, dim1 , count(*) as cnt"
+ " from foo"
+ " where dim1 is not null and __time >= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'"
+ " group by 1, 2"
+ " PARTITIONED by day"
+ " clustered by dim1"
)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(
new InsertCannotAllocateSegmentFault(
"foo1",
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z")
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"),
null
)
)
.verifyResults();
}
@Test
public void testInsertCannotAllocateSegmentFaultWhenInvalidAllocation()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();
// If there is some problem allocating the segment,task action client will return a null value.
Mockito.doReturn(new SegmentIdWithShardSpec(
"foo1",
Intervals.of("2000-01-01/2000-02-01"),
"test",
new LinearShardSpec(2)
)).when(testTaskActionClient).submit(isA(SegmentAllocateAction.class));
testIngestQuery().setSql(
"insert into foo1"
+ " select __time, dim1 , count(*) as cnt"
+ " from foo"
+ " where dim1 is not null and __time >= TIMESTAMP '2000-01-02 00:00:00' and __time < TIMESTAMP '2000-01-03 00:00:00'"
+ " group by 1, 2"
+ " PARTITIONED by day"
+ " clustered by dim1"
)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQFault(
new InsertCannotAllocateSegmentFault(
"foo1",
Intervals.of("2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z"),
Intervals.of("2000-01-01T00:00:00.000Z/2000-02-01T00:00:00.000Z")
)
)
.verifyResults();
}
@Test
public void testInsertCannotBeEmptyFault()
{

View File

@ -54,7 +54,12 @@ public class MSQFaultSerdeTest
assertFaultSerde(new ColumnTypeNotSupportedFault("the column", null));
assertFaultSerde(new ColumnTypeNotSupportedFault("the column", ColumnType.STRING_ARRAY));
assertFaultSerde(new ColumnNameRestrictedFault("the column"));
assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY));
assertFaultSerde(new InsertCannotAllocateSegmentFault("the datasource", Intervals.ETERNITY, null));
assertFaultSerde(new InsertCannotAllocateSegmentFault(
"the datasource",
Intervals.of("2000-01-01/2002-01-01"),
Intervals.ETERNITY
));
assertFaultSerde(new InsertCannotBeEmptyFault("the datasource"));
assertFaultSerde(InsertLockPreemptedFault.INSTANCE);
assertFaultSerde(InsertTimeNullFault.INSTANCE);

View File

@ -19,8 +19,12 @@
package org.apache.druid.msq.util;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
@ -84,6 +88,101 @@ public class IntervalUtilsTest
);
}
@Test
public void test_doesIntervalMatchesGranularity_withStandardGranularities()
{
Assert.assertTrue(
IntervalUtils.isAligned(Intervals.ETERNITY, Granularities.ALL)
);
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2001-01-01"), Granularities.YEAR
)
);
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2000-04-01"), Granularities.QUARTER
)
);
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2000-02-01"), Granularities.MONTH
)
);
// With the way WEEK granularities work, this needs to be aligned to an actual week
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("1999-12-27/2000-01-03"), Granularities.WEEK
)
);
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2000-01-02"), Granularities.DAY
)
);
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01T00:00:00.000/2000-01-01T08:00:00.000"), Granularities.EIGHT_HOUR
)
);
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01T00:00:00.000/2000-01-01T01:00:00.000"), Granularities.HOUR
)
);
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:01:00.000"), Granularities.MINUTE
)
);
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01T00:00:00.000/2000-01-01T00:00:01.000"), Granularities.SECOND
)
);
Assert.assertFalse(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2002-01-01"), Granularities.YEAR
)
);
Assert.assertFalse(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2002-01-08"), Granularities.YEAR
)
);
}
@Test
public void test_doesIntervalMatchesGranularity_withPeriodGranularity()
{
Assert.assertTrue(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2000-01-04"),
new PeriodGranularity(new Period("P3D"), DateTimes.of("2000-01-01"), null)
)
);
Assert.assertFalse(
IntervalUtils.isAligned(
Intervals.of("2000-01-01/2000-01-04"),
new PeriodGranularity(new Period("P3D"), DateTimes.of("2000-01-02"), null)
)
);
}
public static List<Interval> intervals(final String... intervals)
{
return Arrays.stream(intervals).map(Intervals::of).collect(Collectors.toList());

View File

@ -145,7 +145,8 @@ public abstract class Granularity implements Cacheable
public abstract DateTime toDate(String filePath, Formatter formatter);
/**
* Return true if time chunks populated by this granularity includes the given interval time chunk.
* Return true only if the time chunks populated by this granularity includes the given interval time chunk. The
* interval must fit exactly into the scheme of the granularity for this to return true
*/
public abstract boolean isAligned(Interval interval);