From 6e2eded277bec907466ce0b7823ce70fae6e388e Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 18 Feb 2022 23:02:57 -0800 Subject: [PATCH] Allow coordinator run auto compaction duty period to be configured separately from other indexing duties (#12263) * add impl * add impl * add unit tests * add impl * add impl * add serde test * add tests * add docs * fix test * fix test * fix docs * fix docs * fix spelling --- docs/design/coordinator.md | 7 + .../server/coordinator/DruidCoordinator.java | 47 +++- .../coordinator/duty/CompactSegments.java | 23 +- .../duty/CoordinatorCustomDuty.java | 1 + .../coordinator/DruidCoordinatorTest.java | 226 +++++++++++++++++- .../TestDruidCoordinatorConfig.java | 52 +++- .../coordinator/duty/CompactSegmentsTest.java | 24 ++ 7 files changed, 365 insertions(+), 15 deletions(-) diff --git a/docs/design/coordinator.md b/docs/design/coordinator.md index d5769a935ce..a3d33eca9ff 100644 --- a/docs/design/coordinator.md +++ b/docs/design/coordinator.md @@ -98,6 +98,13 @@ Compaction tasks might fail due to the following reasons. Once a compaction task fails, the Coordinator simply checks the segments in the interval of the failed task again, and issues another compaction task in the next run. +Note that Compacting Segments Coordinator Duty is automatically enabled and run as part of the Indexing Service Duties group. However, Compacting Segments Coordinator Duty can be configured to run in isolation as a separate coordinator duty group. This allows changing the period of Compacting Segments Coordinator Duty without impacting the period of other Indexing Service Duties. This can be done by setting the following properties (for more details see [custom pluggable Coordinator Duty](../development/modules.md#adding-your-own-custom-pluggable-coordinator-duty)): +``` +druid.coordinator.dutyGroups=[] +druid.coordinator..duties=["compactSegments"] +druid.coordinator..period= +``` + ### Segment search policy #### Recent segment first policy diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index c901cb96763..8c060ee321f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; @@ -158,7 +159,7 @@ public class DruidCoordinator private final BalancerStrategyFactory factory; private final LookupCoordinatorManager lookupCoordinatorManager; private final DruidLeaderSelector coordLeaderSelector; - + private final ObjectMapper objectMapper; private final CompactSegments compactSegments; private volatile boolean started = false; @@ -194,7 +195,7 @@ public class DruidCoordinator BalancerStrategyFactory factory, LookupCoordinatorManager lookupCoordinatorManager, @Coordinator DruidLeaderSelector coordLeaderSelector, - CompactSegments compactSegments, + ObjectMapper objectMapper, ZkEnablementConfig zkEnablementConfig ) { @@ -219,7 +220,7 @@ public class DruidCoordinator factory, lookupCoordinatorManager, coordLeaderSelector, - compactSegments, + objectMapper, zkEnablementConfig ); } @@ -245,7 +246,7 @@ public class DruidCoordinator BalancerStrategyFactory factory, LookupCoordinatorManager lookupCoordinatorManager, DruidLeaderSelector coordLeaderSelector, - CompactSegments compactSegments, + ObjectMapper objectMapper, ZkEnablementConfig zkEnablementConfig ) { @@ -276,8 +277,8 @@ public class DruidCoordinator this.factory = factory; this.lookupCoordinatorManager = lookupCoordinatorManager; this.coordLeaderSelector = coordLeaderSelector; - - this.compactSegments = compactSegments; + this.objectMapper = objectMapper; + this.compactSegments = initializeCompactSegmentsDuty(); } public boolean isLeader() @@ -769,14 +770,17 @@ public class DruidCoordinator ); } - private List makeIndexingServiceDuties() + @VisibleForTesting + List makeIndexingServiceDuties() { List duties = new ArrayList<>(); duties.add(new LogUsedSegments()); duties.addAll(indexingServiceDuties); // CompactSegmentsDuty should be the last duty as it can take a long time to complete - duties.addAll(makeCompactSegmentsDuty()); - + // We do not have to add compactSegments if it is already enabled in the custom duty group + if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) { + duties.addAll(makeCompactSegmentsDuty()); + } log.debug( "Done making indexing service duties %s", duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()) @@ -797,6 +801,31 @@ public class DruidCoordinator return ImmutableList.copyOf(duties); } + @VisibleForTesting + CompactSegments initializeCompactSegmentsDuty() + { + List compactSegmentsDutyFromCustomGroups = getCompactSegmentsDutyFromCustomGroups(); + if (compactSegmentsDutyFromCustomGroups.isEmpty()) { + return new CompactSegments(config, objectMapper, indexingServiceClient); + } else { + if (compactSegmentsDutyFromCustomGroups.size() > 1) { + log.warn("More than one compactSegments duty is configured in the Coordinator Custom Duty Group. The first duty will be picked up."); + } + return compactSegmentsDutyFromCustomGroups.get(0); + } + } + + @VisibleForTesting + List getCompactSegmentsDutyFromCustomGroups() + { + return customDutyGroups.getCoordinatorCustomDutyGroups() + .stream() + .flatMap(coordinatorCustomDutyGroup -> coordinatorCustomDutyGroup.getCustomDutyList().stream()) + .filter(duty -> duty instanceof CompactSegments) + .map(duty -> (CompactSegments) duty) + .collect(Collectors.toList()); + } + private List makeCompactSegmentsDuty() { return ImmutableList.of(compactSegments); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 6598a6ede18..e3f47ddf247 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -19,6 +19,8 @@ package org.apache.druid.server.coordinator.duty; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.inject.Inject; @@ -56,7 +58,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; -public class CompactSegments implements CoordinatorDuty +public class CompactSegments implements CoordinatorCustomDuty { static final String COMPACTION_TASK_COUNT = "compactTaskCount"; static final String AVAILABLE_COMPACTION_TASK_SLOT = "availableCompactionTaskSlot"; @@ -90,10 +92,11 @@ public class CompactSegments implements CoordinatorDuty private final AtomicReference> autoCompactionSnapshotPerDataSource = new AtomicReference<>(); @Inject + @JsonCreator public CompactSegments( - DruidCoordinatorConfig config, - ObjectMapper objectMapper, - IndexingServiceClient indexingServiceClient + @JacksonInject DruidCoordinatorConfig config, + @JacksonInject ObjectMapper objectMapper, + @JacksonInject IndexingServiceClient indexingServiceClient ) { this.policy = new NewestSegmentFirstPolicy(objectMapper); @@ -104,6 +107,18 @@ public class CompactSegments implements CoordinatorDuty LOG.info("Scheduling compaction with skipLockedIntervals [%s]", skipLockedIntervals); } + @VisibleForTesting + public boolean isSkipLockedIntervals() + { + return skipLockedIntervals; + } + + @VisibleForTesting + IndexingServiceClient getIndexingServiceClient() + { + return indexingServiceClient; + } + @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java index 86e2032c38f..d48171472dd 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java @@ -51,6 +51,7 @@ import org.apache.druid.initialization.DruidModule; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ @JsonSubTypes.Type(name = "killSupervisors", value = KillSupervisorsCustomDuty.class), + @JsonSubTypes.Type(name = "compactSegments", value = CompactSegments.class), }) @ExtensionPoint public interface CoordinatorCustomDuty extends CoordinatorDuty diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index ec0201bd5c1..036374828e6 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -52,9 +52,12 @@ import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; +import org.apache.druid.server.coordinator.duty.CoordinatorDuty; +import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty; import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; import org.apache.druid.server.coordinator.rules.ForeverLoadRule; import org.apache.druid.server.coordinator.rules.IntervalLoadRule; @@ -74,6 +77,7 @@ import org.junit.Test; import javax.annotation.Nullable; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -735,7 +739,7 @@ public class DruidCoordinatorTest extends CuratorTestBase EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory); DruidCoordinator c = new DruidCoordinator( - null, + druidCoordinatorConfig, null, configManager, null, @@ -786,6 +790,226 @@ public class DruidCoordinatorTest extends CuratorTestBase Assert.assertFalse(firstExec == thirdExec); } + @Test + public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty() + { + CoordinatorCustomDutyGroups emptyCustomDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of()); + coordinator = new DruidCoordinator( + druidCoordinatorConfig, + new ZkPathsConfig() + { + + @Override + public String getBase() + { + return "druid"; + } + }, + null, + segmentsMetadataManager, + serverInventoryView, + metadataRuleManager, + () -> curator, + serviceEmitter, + scheduledExecutorFactory, + null, + null, + new NoopServiceAnnouncer() + { + @Override + public void announce(DruidNode node) + { + // count down when this coordinator becomes the leader + leaderAnnouncerLatch.countDown(); + } + + @Override + public void unannounce(DruidNode node) + { + leaderUnannouncerLatch.countDown(); + } + }, + druidNode, + loadManagementPeons, + ImmutableSet.of(), + new HashSet<>(), + emptyCustomDutyGroups, + new CostBalancerStrategyFactory(), + EasyMock.createNiceMock(LookupCoordinatorManager.class), + new TestDruidLeaderSelector(), + null, + ZkEnablementConfig.ENABLED + ); + // Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties + List indexingDuties = coordinator.makeIndexingServiceDuties(); + Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments)); + + // CompactSegments should not exist in Custom Duty Group + List compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups(); + Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty()); + + // CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator + CompactSegments duty = coordinator.initializeCompactSegmentsDuty(); + Assert.assertNotNull(duty); + Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals()); + } + + @Test + public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupDoesNotContainsCompactSegments() + { + CoordinatorCustomDutyGroup group = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(new KillSupervisorsCustomDuty(new Duration("PT1S"), null))); + CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(group)); + coordinator = new DruidCoordinator( + druidCoordinatorConfig, + new ZkPathsConfig() + { + + @Override + public String getBase() + { + return "druid"; + } + }, + null, + segmentsMetadataManager, + serverInventoryView, + metadataRuleManager, + () -> curator, + serviceEmitter, + scheduledExecutorFactory, + null, + null, + new NoopServiceAnnouncer() + { + @Override + public void announce(DruidNode node) + { + // count down when this coordinator becomes the leader + leaderAnnouncerLatch.countDown(); + } + + @Override + public void unannounce(DruidNode node) + { + leaderUnannouncerLatch.countDown(); + } + }, + druidNode, + loadManagementPeons, + ImmutableSet.of(), + new HashSet<>(), + customDutyGroups, + new CostBalancerStrategyFactory(), + EasyMock.createNiceMock(LookupCoordinatorManager.class), + new TestDruidLeaderSelector(), + null, + ZkEnablementConfig.ENABLED + ); + // Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties + List indexingDuties = coordinator.makeIndexingServiceDuties(); + Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments)); + + // CompactSegments should not exist in Custom Duty Group + List compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups(); + Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty()); + + // CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator + CompactSegments duty = coordinator.initializeCompactSegmentsDuty(); + Assert.assertNotNull(duty); + Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals()); + } + + @Test + public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupContainsCompactSegments() + { + DruidCoordinatorConfig differentConfigUsedInCustomGroup = new TestDruidCoordinatorConfig( + new Duration(COORDINATOR_START_DELAY), + new Duration(COORDINATOR_PERIOD), + null, + null, + null, + new Duration(COORDINATOR_PERIOD), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + 10, + new Duration("PT0s"), + false + ); + CoordinatorCustomDutyGroup compactSegmentCustomGroup = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(new CompactSegments(differentConfigUsedInCustomGroup, null, null))); + CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(compactSegmentCustomGroup)); + coordinator = new DruidCoordinator( + druidCoordinatorConfig, + new ZkPathsConfig() + { + + @Override + public String getBase() + { + return "druid"; + } + }, + null, + segmentsMetadataManager, + serverInventoryView, + metadataRuleManager, + () -> curator, + serviceEmitter, + scheduledExecutorFactory, + null, + null, + new NoopServiceAnnouncer() + { + @Override + public void announce(DruidNode node) + { + // count down when this coordinator becomes the leader + leaderAnnouncerLatch.countDown(); + } + + @Override + public void unannounce(DruidNode node) + { + leaderUnannouncerLatch.countDown(); + } + }, + druidNode, + loadManagementPeons, + ImmutableSet.of(), + new HashSet<>(), + customDutyGroups, + new CostBalancerStrategyFactory(), + EasyMock.createNiceMock(LookupCoordinatorManager.class), + new TestDruidLeaderSelector(), + null, + ZkEnablementConfig.ENABLED + ); + // Since CompactSegments is enabled in Custom Duty Group, then CompactSegments must not be created in IndexingServiceDuties + List indexingDuties = coordinator.makeIndexingServiceDuties(); + Assert.assertTrue(indexingDuties.stream().noneMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments)); + + // CompactSegments should exist in Custom Duty Group + List compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups(); + Assert.assertFalse(compactSegmentsDutyFromCustomGroups.isEmpty()); + Assert.assertEquals(1, compactSegmentsDutyFromCustomGroups.size()); + Assert.assertNotNull(compactSegmentsDutyFromCustomGroups.get(0)); + Assert.assertTrue(compactSegmentsDutyFromCustomGroups.get(0) instanceof CompactSegments); + + // CompactSegments returned by this method should be from the Custom Duty Group + CompactSegments duty = coordinator.initializeCompactSegmentsDuty(); + Assert.assertNotNull(duty); + Assert.assertNotEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals()); + // We should get the CompactSegment from the custom duty group which was created with a different config than the config in DruidCoordinator + Assert.assertEquals(differentConfigUsedInCustomGroup.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals()); + } + @Test(timeout = 3000) public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index 196e205b5c9..abb500e9833 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -23,7 +23,6 @@ import org.joda.time.Duration; public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig { - private final Duration coordinatorStartDelay; private final Duration coordinatorPeriod; private final Duration coordinatorIndexingPeriod; @@ -42,6 +41,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private final Duration coordinatorDatasourceKillDurationToRetain; private final Duration getLoadQueuePeonRepeatDelay; private final int coordinatorKillMaxSegments; + private final boolean compactionSkipLockedIntervals; public TestDruidCoordinatorConfig( Duration coordinatorStartDelay, @@ -82,6 +82,50 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain; this.coordinatorKillMaxSegments = coordinatorKillMaxSegments; this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay; + this.compactionSkipLockedIntervals = true; + } + + public TestDruidCoordinatorConfig( + Duration coordinatorStartDelay, + Duration coordinatorPeriod, + Duration coordinatorIndexingPeriod, + Duration metadataStoreManagementPeriod, + Duration loadTimeoutDelay, + Duration coordinatorKillPeriod, + Duration coordinatorKillDurationToRetain, + Duration coordinatorSupervisorKillPeriod, + Duration coordinatorSupervisorKillDurationToRetain, + Duration coordinatorAuditKillPeriod, + Duration coordinatorAuditKillDurationToRetain, + Duration coordinatorCompactionKillPeriod, + Duration coordinatorRuleKillPeriod, + Duration coordinatorRuleKillDurationToRetain, + Duration coordinatorDatasourceKillPeriod, + Duration coordinatorDatasourceKillDurationToRetain, + int coordinatorKillMaxSegments, + Duration getLoadQueuePeonRepeatDelay, + boolean compactionSkipLockedIntervals + ) + { + this.coordinatorStartDelay = coordinatorStartDelay; + this.coordinatorPeriod = coordinatorPeriod; + this.coordinatorIndexingPeriod = coordinatorIndexingPeriod; + this.metadataStoreManagementPeriod = metadataStoreManagementPeriod; + this.loadTimeoutDelay = loadTimeoutDelay; + this.coordinatorKillPeriod = coordinatorKillPeriod; + this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain; + this.coordinatorSupervisorKillPeriod = coordinatorSupervisorKillPeriod; + this.coordinatorSupervisorKillDurationToRetain = coordinatorSupervisorKillDurationToRetain; + this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod; + this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain; + this.coordinatorCompactionKillPeriod = coordinatorCompactionKillPeriod; + this.coordinatorRuleKillPeriod = coordinatorRuleKillPeriod; + this.coordinatorRuleKillDurationToRetain = coordinatorRuleKillDurationToRetain; + this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod; + this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain; + this.coordinatorKillMaxSegments = coordinatorKillMaxSegments; + this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay; + this.compactionSkipLockedIntervals = compactionSkipLockedIntervals; } @Override @@ -191,4 +235,10 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig { return getLoadQueuePeonRepeatDelay; } + + @Override + public boolean getCompactionSkipLockedIntervals() + { + return compactionSkipLockedIntervals; + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 2892d44d928..bd4e6019c80 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator.duty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -38,6 +39,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.client.indexing.HttpIndexingServiceClient; +import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo; import org.apache.druid.client.indexing.IndexingWorker; import org.apache.druid.client.indexing.IndexingWorkerInfo; @@ -237,6 +239,28 @@ public class CompactSegmentsTest ); } + @Test + public void testSerde() throws Exception + { + final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); + final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); + + JSON_MAPPER.setInjectableValues( + new InjectableValues.Std() + .addValue(DruidCoordinatorConfig.class, COORDINATOR_CONFIG) + .addValue(ObjectMapper.class, JSON_MAPPER) + .addValue(IndexingServiceClient.class, indexingServiceClient) + ); + + final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient); + String compactSegmentString = JSON_MAPPER.writeValueAsString(compactSegments); + CompactSegments serdeCompactSegments = JSON_MAPPER.readValue(compactSegmentString, CompactSegments.class); + + Assert.assertNotNull(serdeCompactSegments); + Assert.assertEquals(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals(), serdeCompactSegments.isSkipLockedIntervals()); + Assert.assertEquals(indexingServiceClient, serdeCompactSegments.getIndexingServiceClient()); + } + @Test public void testRun() {