diff --git a/core/src/main/java/org/apache/druid/timeline/DataSegment.java b/core/src/main/java/org/apache/druid/timeline/DataSegment.java index e7be88561d5..7fcbec55750 100644 --- a/core/src/main/java/org/apache/druid/timeline/DataSegment.java +++ b/core/src/main/java/org/apache/druid/timeline/DataSegment.java @@ -100,7 +100,7 @@ public class DataSegment implements Comparable, Overshadowable newContext = new HashMap<>(getContext()); newContext.put(CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, getId()); + newContext.putIfAbsent(CompactSegments.STORE_COMPACTION_STATE_KEY, STORE_COMPACTION_STATE); // Set the priority of the compaction task. newContext.put(Tasks.PRIORITY_KEY, getPriority()); return newContext; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java index 502eb859618..abd542418e1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java @@ -51,7 +51,7 @@ public class Tasks public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout"; public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock"; /** - * This context is used in auto compaction. When it is set in the context, the segments created by the task + * This context is used in compaction. When it is set in the context, the segments created by the task * will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not. * See {@link org.apache.druid.timeline.DataSegment} and {@link * org.apache.druid.server.coordinator.duty.NewestSegmentFirstIterator} for more details. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 3cc99c9c6ff..616dd24978e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -140,7 +140,6 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING) - .context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, true)) .build(); final Set compactedSegments = runTask(compactionTask); @@ -153,6 +152,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass() ); + // Expecte compaction state to exist as store compaction state by default Assert.assertEquals(expectedState, segment.getLastCompactionState()); } } @@ -174,7 +174,6 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) .tuningConfig(newTuningConfig(new HashedPartitionsSpec(null, 3, null), 2, true)) - .context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, true)) .build(); final Set compactedSegments = runTask(compactionTask); @@ -183,6 +182,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) ); for (DataSegment segment : compactedSegments) { + // Expecte compaction state to exist as store compaction state by default Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); Assert.assertEquals(expectedState, segment.getLastCompactionState()); } @@ -205,7 +205,6 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) .tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 2, true)) - .context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, true)) .build(); final Set compactedSegments = runTask(compactionTask); @@ -214,11 +213,40 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) ); for (DataSegment segment : compactedSegments) { + // Expecte compaction state to exist as store compaction state by default Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); Assert.assertEquals(expectedState, segment.getLastCompactionState()); } } + @Test + public void testRunCompactionStateNotStoreIfContextSetToFalse() + { + runIndexTask(null, true); + + final Builder builder = new Builder( + DATA_SOURCE, + getSegmentLoaderFactory(), + RETRY_POLICY_FACTORY + ); + final CompactionTask compactionTask = builder + .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) + .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING) + .context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, false)) + .build(); + + final Set compactedSegments = runTask(compactionTask); + + for (DataSegment segment : compactedSegments) { + Assert.assertSame( + lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class, + segment.getShardSpec().getClass() + ); + // Expecte compaction state to exist as store compaction state by default + Assert.assertEquals(null, segment.getLastCompactionState()); + } + } + @Test public void testCompactHashAndDynamicPartitionedSegments() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 719fba10bbe..08cd765a1fb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -130,6 +130,8 @@ public class CompactionTaskRunTest extends IngestionTestBase false, 0 ); + + // Expecte compaction state to exist after compaction as we store compaction state by default private static CompactionState DEFAULT_COMPACTION_STATE; private static final List TEST_ROWS = ImmutableList.of( @@ -761,6 +763,9 @@ public class CompactionTaskRunTest extends IngestionTestBase null ); + // This is a regular index so we need to explicitly add this context to store the CompactionState + indexTask.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true); + final Pair> resultPair = runTask(indexTask); Assert.assertTrue(resultPair.lhs.isSuccess()); @@ -853,7 +858,6 @@ public class CompactionTaskRunTest extends IngestionTestBase final TaskToolbox box = createTaskToolbox(objectMapper, task); task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); - task.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true); if (task.isReady(box.getTaskActionClient())) { if (readyLatchToCountDown != null) { readyLatchToCountDown.countDown(); diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json index d59b14fa98b..b09e9de9a16 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json @@ -15,6 +15,7 @@ "is_overshadowed": 0, "shardSpec": "NoneShardSpec", "dimensions": "[anonymous, area_code, city, continent_code, country_name, dma_code, geo, language, namespace, network, newpage, page, postal_code, region_lookup, robot, unpatrolled, user]", - "metrics": "[added, count, deleted, delta, delta_hist, unique_users, variation]" + "metrics": "[added, count, deleted, delta, delta_hist, unique_users, variation]", + "last_compaction_state": null } ] diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index 88d3ff9abb8..6e895772b33 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -67,7 +67,6 @@ import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.QueryCountStatsProvider; import org.apache.druid.server.router.TieredBrokerConfig; import org.apache.druid.sql.guice.SqlModule; -import org.apache.druid.timeline.PruneLastCompactionState; import org.apache.druid.timeline.PruneLoadSpec; import org.eclipse.jetty.server.Server; @@ -102,7 +101,6 @@ public class CliBroker extends ServerRunnable binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8082); binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8282); binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true); - binder.bindConstant().annotatedWith(PruneLastCompactionState.class).to(true); binder.bind(ResponseContextConfig.class).toInstance(ResponseContextConfig.newConfig(false)); binder.bind(CachingClusteredClient.class).in(LazySingleton.class); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 0237b86512c..16e5c0aa3b5 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -145,6 +145,7 @@ public class SystemSchema extends AbstractSchema .add("shardSpec", ValueType.STRING) .add("dimensions", ValueType.STRING) .add("metrics", ValueType.STRING) + .add("last_compaction_state", ValueType.STRING) .build(); static final RowSignature SERVERS_SIGNATURE = RowSignature @@ -311,7 +312,8 @@ public class SystemSchema extends AbstractSchema val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE, segment.getShardSpec(), segment.getDimensions(), - segment.getMetrics() + segment.getMetrics(), + segment.getLastCompactionState() }; }); @@ -343,7 +345,8 @@ public class SystemSchema extends AbstractSchema IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed val.getValue().getSegment().getShardSpec(), val.getValue().getSegment().getDimensions(), - val.getValue().getSegment().getMetrics() + val.getValue().getSegment().getMetrics(), + null // unpublished segments from realtime tasks will not be compacted yet }; }); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index a6db177116d..f943aafef54 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -47,6 +47,7 @@ import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -89,6 +90,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.view.NoopViewManager; +import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentWithOvershadowedStatus; @@ -271,8 +273,12 @@ public class SystemSchemaTest extends CalciteTestBase ); } + private final CompactionState expectedCompactionState = new CompactionState( + new DynamicPartitionsSpec(null, null), + Collections.singletonMap("test", "map") + ); - private final DataSegment publishedSegment1 = new DataSegment( + private final DataSegment publishedCompactedSegment1 = new DataSegment( "wikipedia1", Intervals.of("2007/2008"), "version1", @@ -280,10 +286,11 @@ public class SystemSchemaTest extends CalciteTestBase ImmutableList.of("dim1", "dim2"), ImmutableList.of("met1", "met2"), null, + expectedCompactionState, 1, 53000L ); - private final DataSegment publishedSegment2 = new DataSegment( + private final DataSegment publishedCompactedSegment2 = new DataSegment( "wikipedia2", Intervals.of("2008/2009"), "version2", @@ -291,10 +298,11 @@ public class SystemSchemaTest extends CalciteTestBase ImmutableList.of("dim1", "dim2"), ImmutableList.of("met1", "met2"), null, + expectedCompactionState, 1, 83000L ); - private final DataSegment publishedSegment3 = new DataSegment( + private final DataSegment publishedUncompactedSegment3 = new DataSegment( "wikipedia3", Intervals.of("2009/2010"), "version3", @@ -302,6 +310,7 @@ public class SystemSchemaTest extends CalciteTestBase ImmutableList.of("dim1", "dim2"), ImmutableList.of("met1", "met2"), null, + null, 1, 47000L ); @@ -495,7 +504,7 @@ public class SystemSchemaTest extends CalciteTestBase final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl()); final List fields = rowType.getFieldList(); - Assert.assertEquals(16, fields.size()); + Assert.assertEquals(17, fields.size()); final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks"); final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl()); @@ -518,9 +527,9 @@ public class SystemSchemaTest extends CalciteTestBase { final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, authMapper); final Set publishedSegments = new HashSet<>(Arrays.asList( - new SegmentWithOvershadowedStatus(publishedSegment1, true), - new SegmentWithOvershadowedStatus(publishedSegment2, false), - new SegmentWithOvershadowedStatus(publishedSegment3, false), + new SegmentWithOvershadowedStatus(publishedCompactedSegment1, true), + new SegmentWithOvershadowedStatus(publishedCompactedSegment2, false), + new SegmentWithOvershadowedStatus(publishedUncompactedSegment3, false), new SegmentWithOvershadowedStatus(segment1, true), new SegmentWithOvershadowedStatus(segment2, false) )); @@ -576,7 +585,8 @@ public class SystemSchemaTest extends CalciteTestBase 1L, //is_published 1L, //is_available 0L, //is_realtime - 1L //is_overshadowed + 1L, //is_overshadowed + null //is_compacted ); verifyRow( @@ -589,7 +599,8 @@ public class SystemSchemaTest extends CalciteTestBase 1L, //is_published 1L, //is_available 0L, //is_realtime - 0L //is_overshadowed + 0L, //is_overshadowed, + null //is_compacted ); //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2 @@ -603,7 +614,8 @@ public class SystemSchemaTest extends CalciteTestBase 0L, //is_published 1L, //is_available 0L, //is_realtime - 0L //is_overshadowed + 0L, //is_overshadowed + null //is_compacted ); verifyRow( @@ -616,7 +628,8 @@ public class SystemSchemaTest extends CalciteTestBase 0L, //is_published 1L, //is_available 1L, //is_realtime - 0L //is_overshadowed + 0L, //is_overshadowed + null //is_compacted ); verifyRow( @@ -629,10 +642,12 @@ public class SystemSchemaTest extends CalciteTestBase 0L, //is_published 1L, //is_available 1L, //is_realtime - 0L //is_overshadowed + 0L, //is_overshadowed + null //is_compacted ); // wikipedia segments are published and unavailable, num_replicas is 0 + // wikipedia segment 1 and 2 are compacted while 3 are not compacted verifyRow( rows.get(5), "wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1", @@ -643,7 +658,8 @@ public class SystemSchemaTest extends CalciteTestBase 1L, //is_published 0L, //is_available 0L, //is_realtime - 1L //is_overshadowed + 1L, //is_overshadowed + expectedCompactionState //is_compacted ); verifyRow( @@ -656,7 +672,8 @@ public class SystemSchemaTest extends CalciteTestBase 1L, //is_published 0L, //is_available 0L, //is_realtime - 0L //is_overshadowed + 0L, //is_overshadowed + expectedCompactionState //is_compacted ); verifyRow( @@ -669,7 +686,8 @@ public class SystemSchemaTest extends CalciteTestBase 1L, //is_published 0L, //is_available 0L, //is_realtime - 0L //is_overshadowed + 0L, //is_overshadowed + null //is_compacted ); // Verify value types. @@ -686,7 +704,8 @@ public class SystemSchemaTest extends CalciteTestBase long isPublished, long isAvailable, long isRealtime, - long isOvershadowed + long isOvershadowed, + CompactionState compactionState ) { Assert.assertEquals(segmentId, row[0].toString()); @@ -703,6 +722,7 @@ public class SystemSchemaTest extends CalciteTestBase Assert.assertEquals(isAvailable, row[10]); Assert.assertEquals(isRealtime, row[11]); Assert.assertEquals(isOvershadowed, row[12]); + Assert.assertEquals(compactionState, row[16]); } @Test @@ -1271,6 +1291,8 @@ public class SystemSchemaTest extends CalciteTestBase expectedClass = SegmentId.class; } else if (signature.getColumnName(i).equals("shardSpec")) { expectedClass = ShardSpec.class; + } else if (signature.getColumnName(i).equals("last_compaction_state")) { + expectedClass = CompactionState.class; } else if (signature.getColumnName(i).equals("dimensions") || signature.getColumnName(i).equals("metrics")) { expectedClass = List.class; } else { diff --git a/website/.spelling b/website/.spelling index 02333c91ba0..9c3f1a0db65 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1498,6 +1498,7 @@ is_overshadowed is_published is_realtime java.sql.Types +last_compaction_state max_size num_replicas num_rows