Add storeCompactionState flag support to msq (#15965)

Compaction in the native engine by default records the state of compaction for each segment in the lastCompactionState segment field. This PR adds support for doing the same in the MSQ engine, targeted for future cases such as REPLACE and compaction done via MSQ.

Note that this PR doesn't implicitly store the compaction state for MSQ replace tasks; it is stored with flag "storeCompactionState": true in the query context.
This commit is contained in:
Vishesh Garg 2024-04-09 16:47:47 +05:30 committed by GitHub
parent 9a4fb58543
commit 3d595cfab1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 491 additions and 29 deletions

View File

@ -399,6 +399,7 @@ The following table lists the context parameters for the MSQ task engine:
| `rowsPerPage` | SELECT<br /><br />The number of rows per page to target. The actual number of rows per page may be somewhat higher or lower than this number. In most cases, use the default.<br /> This property comes into effect only when `selectDestination` is set to `durableStorage` | 100000 |
| `skipTypeVerification` | INSERT or REPLACE<br /><br />During query validation, Druid validates that [string arrays](../querying/arrays.md) and [multi-value dimensions](../querying/multi-value-dimensions.md) are not mixed in the same column. If you are intentionally migrating from one to the other, use this context parameter to disable type validation.<br /><br />Provide the column list as comma-separated values or as a JSON array in string form.| empty list |
| `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the default), an INSERT query generating no output rows will be no-op, and a REPLACE query generating no output rows will delete all data that matches the OVERWRITE clause. When set to true, an ingest query generating no output rows will throw an `InsertCannotBeEmpty` fault. | `false` |
| `storeCompactionState` | REPLACE<br /><br /> When set to true, a REPLACE query stores as part of each segment's metadata a `lastCompactionState` field that captures the various specs used to create the segment. Future compaction jobs skip segments whose `lastCompactionState` matches the desired compaction state. Works the same as [`storeCompactionState`](../ingestion/tasks.md#context-parameters) task context flag. | `false` |
## Joins

View File

@ -20,6 +20,7 @@
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@ -41,6 +42,7 @@ import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionSchema;
@ -63,6 +65,9 @@ import org.apache.druid.frame.write.InvalidFieldException;
import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@ -77,6 +82,7 @@ import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction
import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.TooManyBucketsException;
import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
@ -192,6 +198,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@ -199,6 +206,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
@ -208,6 +216,7 @@ import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
@ -231,6 +240,7 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
@ -242,6 +252,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
@ -1732,12 +1743,114 @@ public class ControllerImpl implements Controller
//noinspection unchecked
@SuppressWarnings("unchecked")
final Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId);
Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId);
boolean storeCompactionState = QueryContext.of(task.getQuerySpec().getQuery().getContext())
.getBoolean(
Tasks.STORE_COMPACTION_STATE_KEY,
Tasks.DEFAULT_STORE_COMPACTION_STATE
);
if (!segments.isEmpty() && storeCompactionState) {
DataSourceMSQDestination destination = (DataSourceMSQDestination) task.getQuerySpec().getDestination();
if (!destination.isReplaceTimeChunks()) {
// Store compaction state only for replace queries.
log.warn(
"storeCompactionState flag set for a non-REPLACE query [%s]. Ignoring the flag for now.",
queryDef.getQueryId()
);
} else {
DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec();
Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = addCompactionStateToSegments(
task(),
context.jsonMapper(),
dataSchema,
shardSpec,
queryDef.getQueryId()
);
segments = compactionStateAnnotateFunction.apply(segments);
}
}
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size());
publishAllSegments(segments);
}
}
private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateToSegments(
MSQControllerTask task,
ObjectMapper jsonMapper,
DataSchema dataSchema,
ShardSpec shardSpec,
String queryId
)
{
final MSQTuningConfig tuningConfig = task.getQuerySpec().getTuningConfig();
PartitionsSpec partitionSpec;
if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
partitionSpec = new DimensionRangePartitionsSpec(
tuningConfig.getRowsPerSegment(),
null,
partitionDimensions,
false
);
} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
// MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
} else {
// SingleDimenionShardSpec and other shard specs are never created in MSQ.
throw new MSQException(
UnknownFault.forMessage(
StringUtils.format(
"Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].",
queryId,
shardSpec.getType()
)));
}
Granularity segmentGranularity = ((DataSourceMSQDestination) task.getQuerySpec().getDestination())
.getSegmentGranularity();
GranularitySpec granularitySpec = new UniformGranularitySpec(
segmentGranularity,
dataSchema.getGranularitySpec().getQueryGranularity(),
dataSchema.getGranularitySpec().isRollup(),
dataSchema.getGranularitySpec().inputIntervals()
);
DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
Map<String, Object> transformSpec = TransformSpec.NONE.equals(dataSchema.getTransformSpec())
? null
: new ClientCompactionTaskTransformSpec(
dataSchema.getTransformSpec().getFilter()
).asMap(jsonMapper);
List<Object> metricsSpec = dataSchema.getAggregators() == null
? null
: jsonMapper.convertValue(
dataSchema.getAggregators(), new TypeReference<List<Object>>()
{
});
IndexSpec indexSpec = tuningConfig.getIndexSpec();
log.info("Query[%s] storing compaction state in segments.", queryId);
return CompactionState.addCompactionStateToSegments(
partitionSpec,
dimensionsSpec,
metricsSpec,
transformSpec,
indexSpec.asMap(jsonMapper),
granularitySpec.asMap(jsonMapper)
);
}
/**
* Clean up durable storage, if used for stage output.
* <p>
@ -1798,7 +1911,9 @@ public class ControllerImpl implements Controller
}
} else {
shuffleSpecFactory = querySpec.getDestination()
.getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()));
.getShuffleSpecFactory(
MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
);
queryToPlan = querySpec.getQuery();
}
@ -1900,9 +2015,11 @@ public class ControllerImpl implements Controller
if (filesIterator.hasNext()) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Found files at provided export destination[%s]. Export is only allowed to "
+ "an empty path. Please provide an empty path/subdirectory or move the existing files.",
exportStorageProvider.getBasePath());
.build(
"Found files at provided export destination[%s]. Export is only allowed to "
+ "an empty path. Please provide an empty path/subdirectory or move the existing files.",
exportStorageProvider.getBasePath()
);
}
}
catch (IOException e) {
@ -1934,7 +2051,6 @@ public class ControllerImpl implements Controller
}
private static DataSchema generateDataSchema(
MSQSpec querySpec,
RowSignature querySignature,
@ -2389,7 +2505,9 @@ public class ControllerImpl implements Controller
workerStatsMap = taskLauncher.getWorkerStats();
}
SegmentLoadStatusFetcher.SegmentLoadWaiterStatus status = segmentLoadWaiter == null ? null : segmentLoadWaiter.status();
SegmentLoadStatusFetcher.SegmentLoadWaiterStatus status = segmentLoadWaiter == null
? null
: segmentLoadWaiter.status();
return new MSQStatusReport(
taskState,

View File

@ -131,7 +131,7 @@ public class MultiStageQueryContext
public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.SEQUENTIAL.toString();
public static final String CTX_ROWS_PER_SEGMENT = "rowsPerSegment";
static final int DEFAULT_ROWS_PER_SEGMENT = 3000000;
public static final int DEFAULT_ROWS_PER_SEGMENT = 3000000;
public static final String CTX_ROWS_PER_PAGE = "rowsPerPage";
static final int DEFAULT_ROWS_PER_PAGE = 100000;

View File

@ -23,18 +23,33 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.msq.indexing.report.MSQSegmentReport;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestTaskActionClient;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
@ -61,14 +76,12 @@ import java.util.TreeSet;
public class MSQReplaceTest extends MSQTestBase
{
private static final String WITH_REPLACE_LOCK = "WITH_REPLACE_LOCK";
private static final Map<String, Object> QUERY_CONTEXT_WITH_REPLACE_LOCK =
private static final String WITH_REPLACE_LOCK_AND_COMPACTION_STATE = "with_replace_lock_and_compaction_state";
private static final Map<String, Object> QUERY_CONTEXT_WITH_REPLACE_LOCK_AND_COMPACTION_STATE =
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
Tasks.TASK_LOCK_TYPE,
StringUtils.toLowerCase(TaskLockType.REPLACE.name())
)
.put(Tasks.TASK_LOCK_TYPE, StringUtils.toLowerCase(TaskLockType.REPLACE.name()))
.put(Tasks.STORE_COMPACTION_STATE_KEY, true)
.build();
public static Collection<Object[]> data()
@ -78,8 +91,8 @@ public class MSQReplaceTest extends MSQTestBase
{DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT},
{WITH_REPLACE_LOCK, QUERY_CONTEXT_WITH_REPLACE_LOCK}
};
{WITH_REPLACE_LOCK_AND_COMPACTION_STATE, QUERY_CONTEXT_WITH_REPLACE_LOCK_AND_COMPACTION_STATE},
};
return Arrays.asList(data);
}
@MethodSource("data")
@ -161,6 +174,14 @@ public class MSQReplaceTest extends MSQTestBase
.with().segmentRowsProcessed(6),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.DAY
)
)
.verifyResults();
}
@ -211,6 +232,14 @@ public class MSQReplaceTest extends MSQTestBase
.with().segmentRowsProcessed(1),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.DAY
)
)
.verifyResults();
}
@ -293,6 +322,14 @@ public class MSQReplaceTest extends MSQTestBase
.with().rows(1, 1, 1).frames(1, 1, 1),
1, 0, "shuffle"
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new LongDimensionSchema("cnt")),
GranularityType.HOUR
)
)
.verifyResults();
}
@ -360,6 +397,14 @@ public class MSQReplaceTest extends MSQTestBase
.with().segmentRowsProcessed(4),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new StringDimensionSchema("user")),
GranularityType.HOUR
)
)
.verifyResults();
}
@ -433,6 +478,14 @@ public class MSQReplaceTest extends MSQTestBase
.with().segmentRowsProcessed(6),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.ALL
)
)
.verifyResults();
}
@ -517,6 +570,14 @@ public class MSQReplaceTest extends MSQTestBase
.with().segmentRowsProcessed(6),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.MONTH
)
)
.verifyResults();
}
@ -591,6 +652,14 @@ public class MSQReplaceTest extends MSQTestBase
.with().segmentRowsProcessed(2),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.MONTH
)
)
.verifyResults();
}
@ -668,6 +737,14 @@ public class MSQReplaceTest extends MSQTestBase
.with().segmentRowsProcessed(2),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.MONTH
)
)
.verifyResults();
}
@ -735,7 +812,6 @@ public class MSQReplaceTest extends MSQTestBase
+ "WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-01-03' "
+ "PARTITIONED BY MONTH")
.setExpectedDataSource("foo")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedDestinationIntervals(Collections.singletonList(Intervals.of("2000-01-01T/2000-03-01T")))
@ -749,6 +825,13 @@ public class MSQReplaceTest extends MSQTestBase
ImmutableList.of(
new Object[]{946684800000L, 1.0f},
new Object[]{946771200000L, 2.0f}
))
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.MONTH
)
)
.verifyResults();
@ -807,6 +890,14 @@ public class MSQReplaceTest extends MSQTestBase
new Object[]{946771200000L, 2.0f}
)
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.MONTH
)
)
.verifyResults();
}
@ -860,6 +951,14 @@ public class MSQReplaceTest extends MSQTestBase
new Object[]{946771200000L, 2.0f}
)
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.MONTH
)
)
.verifyResults();
}
@ -889,6 +988,17 @@ public class MSQReplaceTest extends MSQTestBase
.setQueryContext(context)
.setExpectedSegment(expectedFooSegments())
.setExpectedResultRows(expectedFooRows())
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.singletonList("dim1"),
Arrays.asList(
new StringDimensionSchema("dim1"),
new LongDimensionSchema("cnt")
),
GranularityType.DAY
)
)
.verifyResults();
}
@ -961,6 +1071,72 @@ public class MSQReplaceTest extends MSQTestBase
new Object[]{978480000000L, 6.0f}
)
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.ALL
)
)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceSegmentsWithQuarterSegmentGranularity(String contextName, Map<String, Object> context)
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("m1", ColumnType.FLOAT)
.add("m2", ColumnType.DOUBLE)
.build();
testIngestQuery().setSql(" REPLACE INTO foobar "
+ "OVERWRITE ALL "
+ "SELECT __time, m1, m2 "
+ "FROM foo "
+ "PARTITIONED by TIME_FLOOR(__time, 'P3M') ")
.setExpectedDataSource("foobar")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
.setExpectedSegment(
ImmutableSet.of(
SegmentId.of(
"foobar",
Intervals.of(
"2000-01-01T00:00:00.000Z/2000-04-01T00:00:00.000Z"),
"test",
0
),
SegmentId.of(
"foobar",
Intervals.of(
"2001-01-01T00:00:00.000Z/2001-04-01T00:00:00.000Z"),
"test",
0
)
)
)
.setExpectedResultRows(
ImmutableList.of(
new Object[]{946684800000L, 1.0f, 1.0},
new Object[]{946771200000L, 2.0f, 2.0},
new Object[]{946857600000L, 3.0f, 3.0},
new Object[]{978307200000L, 4.0f, 4.0},
new Object[]{978393600000L, 5.0f, 5.0},
new Object[]{978480000000L, 6.0f, 6.0}
)
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Arrays.asList(new FloatDimensionSchema("m1"), new DoubleDimensionSchema("m2")),
GranularityType.QUARTER
)
)
.verifyResults();
}
@ -1045,6 +1221,14 @@ public class MSQReplaceTest extends MSQTestBase
.with().segmentRowsProcessed(8),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new StringDimensionSchema("d")),
GranularityType.ALL
)
)
.verifyResults();
}
@ -1109,6 +1293,14 @@ public class MSQReplaceTest extends MSQTestBase
.with().segmentRowsProcessed(12),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("d")),
GranularityType.ALL
)
)
.verifyResults();
}
@ -1184,6 +1376,14 @@ public class MSQReplaceTest extends MSQTestBase
.with().segmentRowsProcessed(8),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.singletonList("d"),
Collections.singletonList(new StringDimensionSchema("d")),
GranularityType.DAY
)
)
.verifyResults();
}
@ -1660,4 +1860,50 @@ public class MSQReplaceTest extends MSQTestBase
));
return expectedRows;
}
private CompactionState expectedCompactionState(
Map<String, Object> context,
List<String> partitionDimensions,
List<DimensionSchema> dimensions,
GranularityType segmentGranularity
)
{
if (!context.containsKey(Tasks.STORE_COMPACTION_STATE_KEY)
|| !((Boolean) context.get(Tasks.STORE_COMPACTION_STATE_KEY))) {
return null;
}
PartitionsSpec partitionsSpec;
if (partitionDimensions.isEmpty()) {
partitionsSpec = new DynamicPartitionsSpec(MultiStageQueryContext.DEFAULT_ROWS_PER_SEGMENT, Long.MAX_VALUE);
} else {
partitionsSpec = new DimensionRangePartitionsSpec(MultiStageQueryContext.DEFAULT_ROWS_PER_SEGMENT, null,
partitionDimensions, false
);
}
DimensionsSpec dimensionsSpec = new DimensionsSpec.Builder()
.setDimensions(dimensions)
.setDimensionExclusions(Collections.singletonList(
"__time"))
.build();
IndexSpec indexSpec = new IndexSpec(null, null, null, null, null, null, null);
GranularitySpec granularitySpec = new UniformGranularitySpec(
segmentGranularity.getDefaultGranularity(),
GranularityType.NONE.getDefaultGranularity(),
false,
Intervals.ONLY_ETERNITY
);
List<Object> metricsSpec = Collections.emptyList();
return new CompactionState(
partitionsSpec,
dimensionsSpec,
metricsSpec,
null,
indexSpec.asMap(objectMapper),
granularitySpec.asMap(objectMapper)
);
}
}

View File

@ -187,6 +187,7 @@ import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorModule;
import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.local.LocalFileStorageConnector;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.PruneLoadSpec;
import org.apache.druid.timeline.SegmentId;
@ -855,6 +856,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected MSQSpec expectedMSQSpec = null;
protected MSQTuningConfig expectedTuningConfig = null;
protected Set<SegmentId> expectedSegments = null;
protected CompactionState expectedLastCompactionState = null;
protected Set<Interval> expectedTombstoneIntervals = null;
protected List<Object[]> expectedResultRows = null;
protected Matcher<Throwable> expectedValidationErrorMatcher = null;
@ -902,6 +904,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return asBuilder();
}
public Builder setExpectedLastCompactionState(CompactionState expectedLastCompactionState)
{
this.expectedLastCompactionState = expectedLastCompactionState;
return asBuilder();
}
public Builder setExpectedTombstoneIntervals(Set<Interval> tombstoneIntervals)
{
Preconditions.checkArgument(!tombstoneIntervals.isEmpty(), "Segments cannot be empty");
@ -1278,6 +1286,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
// SegmentGeneratorFrameProcessorFactory. We can get the tombstone segment ids published by taking a set
// difference of all the segments published with the segments that are created by the SegmentGeneratorFrameProcessorFactory
if (!testTaskActionClient.getPublishedSegments().isEmpty()) {
if (expectedLastCompactionState != null) {
CompactionState compactionState = testTaskActionClient.getPublishedSegments().stream().findFirst().get()
.getLastCompactionState();
Assert.assertEquals(expectedLastCompactionState, compactionState);
}
Set<SegmentId> publishedSegmentIds = testTaskActionClient.getPublishedSegments()
.stream()
.map(DataSegment::getId)
@ -1495,4 +1509,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
}
return retVal;
}
}

View File

@ -607,7 +607,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
return tuningConfig.isForceGuaranteedRollup();
}
public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction(
public static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateToSegments(
boolean storeCompactionState,
TaskToolbox toolbox,
IngestionSpec ingestionSpec
@ -628,7 +628,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
? null
: toolbox.getJsonMapper().convertValue(ingestionSpec.getDataSchema().getAggregators(), new TypeReference<List<Object>>() {});
final CompactionState compactionState = new CompactionState(
return CompactionState.addCompactionStateToSegments(
tuningConfig.getPartitionsSpec(),
dimensionsSpec,
metricsSpec,
@ -636,10 +636,6 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper()),
granularitySpec.asMap(toolbox.getJsonMapper())
);
return segments -> segments
.stream()
.map(s -> s.withLastCompactionState(compactionState))
.collect(Collectors.toSet());
} else {
return Function.identity();
}

View File

@ -1221,10 +1221,7 @@ public class CompactionTask extends AbstractBatchIndexTask
final DynamicPartitionsSpec dynamicPartitionsSpec = (DynamicPartitionsSpec) partitionsSpec;
partitionsSpec = new DynamicPartitionsSpec(
dynamicPartitionsSpec.getMaxRowsPerSegment(),
// Setting maxTotalRows to Long.MAX_VALUE to respect the computed maxRowsPerSegment.
// If this is set to something too small, compactionTask can generate small segments
// which need to be compacted again, which in turn making auto compaction stuck in the same interval.
dynamicPartitionsSpec.getMaxTotalRowsOr(Long.MAX_VALUE)
dynamicPartitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_COMPACTION_MAX_TOTAL_ROWS)
);
}
return newTuningConfig.withPartitionsSpec(partitionsSpec);

View File

@ -923,7 +923,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
Tasks.DEFAULT_STORE_COMPACTION_STATE
);
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction =
compactionStateAnnotateFunction(
addCompactionStateToSegments(
storeCompactionState,
toolbox,
ingestionSchema

View File

@ -1149,7 +1149,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
Tasks.STORE_COMPACTION_STATE_KEY,
Tasks.DEFAULT_STORE_COMPACTION_STATE
);
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = compactionStateAnnotateFunction(
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = addCompactionStateToSegments(
storeCompactionState,
toolbox,
ingestionSchema

View File

@ -34,6 +34,10 @@ public class DynamicPartitionsSpec implements PartitionsSpec
* Default maxTotalRows for most task types except compaction task.
*/
public static final long DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
// Using MAX_VALUE as the default for setting maxTotalRows for compaction to respect the computed maxRowsPerSegment.
// If this is set to something too small, compactionTask can generate small segments
// which need to be compacted again, which in turn making auto compaction stuck in the same interval.
public static final long DEFAULT_COMPACTION_MAX_TOTAL_ROWS = Long.MAX_VALUE;
static final String NAME = "dynamic";
private final int maxRowsPerSegment;

View File

@ -27,6 +27,9 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* This class describes what compaction task spec was used to create a given segment.
@ -146,4 +149,29 @@ public class CompactionState
", metricsSpec=" + metricsSpec +
'}';
}
public static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateToSegments(
PartitionsSpec partitionsSpec,
DimensionsSpec dimensionsSpec,
List<Object> metricsSpec,
Map<String, Object> transformSpec,
Map<String, Object> indexSpec,
Map<String, Object> granularitySpec
)
{
CompactionState compactionState = new CompactionState(
partitionsSpec,
dimensionsSpec,
metricsSpec,
transformSpec,
indexSpec,
granularitySpec
);
return segments -> segments
.stream()
.map(s -> s.withLastCompactionState(compactionState))
.collect(Collectors.toSet());
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.RangeSet;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
@ -47,6 +48,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
/**
*/
@ -365,6 +368,59 @@ public class DataSegmentTest
Assert.assertEquals(segment1, segment2.withLastCompactionState(compactionState));
}
@Test
public void testAnnotateWithLastCompactionState()
{
DynamicPartitionsSpec dynamicPartitionsSpec = new DynamicPartitionsSpec(null, null);
DimensionsSpec dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
"bar",
"foo"
)));
List<Object> metricsSpec = ImmutableList.of(ImmutableMap.of("type", "count", "name", "count"));
Map<String, Object> transformSpec = ImmutableMap.of(
"filter",
ImmutableMap.of("type", "selector", "dimension", "dim1", "value", "foo")
);
Map<String, Object> indexSpec = Collections.singletonMap("test", "map");
Map<String, Object> granularitySpec = Collections.singletonMap("test2", "map");
final CompactionState compactionState = new CompactionState(
dynamicPartitionsSpec,
dimensionsSpec,
metricsSpec,
transformSpec,
indexSpec,
granularitySpec
);
final Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateFunction =
CompactionState.addCompactionStateToSegments(
dynamicPartitionsSpec,
dimensionsSpec,
metricsSpec,
transformSpec,
indexSpec,
granularitySpec
);
final DataSegment segment1 = DataSegment.builder()
.dataSource("foo")
.interval(Intervals.of("2012-01-01/2012-01-02"))
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
.shardSpec(getShardSpec(7))
.size(0)
.lastCompactionState(compactionState)
.build();
final DataSegment segment2 = DataSegment.builder()
.dataSource("foo")
.interval(Intervals.of("2012-01-01/2012-01-02"))
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
.shardSpec(getShardSpec(7))
.size(0)
.build();
Assert.assertEquals(ImmutableSet.of(segment1), addCompactionStateFunction.apply(ImmutableSet.of(segment2)));
}
@Test
public void testTombstoneType()
{