Allow coordinator run auto compaction duty period to be configured separately from other indexing duties (#12263)

* add impl

* add impl

* add unit tests

* add impl

* add impl

* add serde test

* add tests

* add docs

* fix test

* fix test

* fix docs

* fix docs

* fix spelling
This commit is contained in:
Maytas Monsereenusorn 2022-02-18 23:02:57 -08:00 committed by GitHub
parent 1ec57cb935
commit 6e2eded277
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 365 additions and 15 deletions

View File

@ -98,6 +98,13 @@ Compaction tasks might fail due to the following reasons.
Once a compaction task fails, the Coordinator simply checks the segments in the interval of the failed task again, and issues another compaction task in the next run.
Note that Compacting Segments Coordinator Duty is automatically enabled and run as part of the Indexing Service Duties group. However, Compacting Segments Coordinator Duty can be configured to run in isolation as a separate coordinator duty group. This allows changing the period of Compacting Segments Coordinator Duty without impacting the period of other Indexing Service Duties. This can be done by setting the following properties (for more details see [custom pluggable Coordinator Duty](../development/modules.md#adding-your-own-custom-pluggable-coordinator-duty)):
```
druid.coordinator.dutyGroups=[<SOME_GROUP_NAME>]
druid.coordinator.<SOME_GROUP_NAME>.duties=["compactSegments"]
druid.coordinator.<SOME_GROUP_NAME>.period=<PERIOD_TO_RUN_COMPACTING_SEGMENTS_DUTY>
```
### Segment search policy
#### Recent segment first policy

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
@ -158,7 +159,7 @@ public class DruidCoordinator
private final BalancerStrategyFactory factory;
private final LookupCoordinatorManager lookupCoordinatorManager;
private final DruidLeaderSelector coordLeaderSelector;
private final ObjectMapper objectMapper;
private final CompactSegments compactSegments;
private volatile boolean started = false;
@ -194,7 +195,7 @@ public class DruidCoordinator
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
@Coordinator DruidLeaderSelector coordLeaderSelector,
CompactSegments compactSegments,
ObjectMapper objectMapper,
ZkEnablementConfig zkEnablementConfig
)
{
@ -219,7 +220,7 @@ public class DruidCoordinator
factory,
lookupCoordinatorManager,
coordLeaderSelector,
compactSegments,
objectMapper,
zkEnablementConfig
);
}
@ -245,7 +246,7 @@ public class DruidCoordinator
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
DruidLeaderSelector coordLeaderSelector,
CompactSegments compactSegments,
ObjectMapper objectMapper,
ZkEnablementConfig zkEnablementConfig
)
{
@ -276,8 +277,8 @@ public class DruidCoordinator
this.factory = factory;
this.lookupCoordinatorManager = lookupCoordinatorManager;
this.coordLeaderSelector = coordLeaderSelector;
this.compactSegments = compactSegments;
this.objectMapper = objectMapper;
this.compactSegments = initializeCompactSegmentsDuty();
}
public boolean isLeader()
@ -769,14 +770,17 @@ public class DruidCoordinator
);
}
private List<CoordinatorDuty> makeIndexingServiceDuties()
@VisibleForTesting
List<CoordinatorDuty> makeIndexingServiceDuties()
{
List<CoordinatorDuty> duties = new ArrayList<>();
duties.add(new LogUsedSegments());
duties.addAll(indexingServiceDuties);
// CompactSegmentsDuty should be the last duty as it can take a long time to complete
duties.addAll(makeCompactSegmentsDuty());
// We do not have to add compactSegments if it is already enabled in the custom duty group
if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) {
duties.addAll(makeCompactSegmentsDuty());
}
log.debug(
"Done making indexing service duties %s",
duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())
@ -797,6 +801,31 @@ public class DruidCoordinator
return ImmutableList.copyOf(duties);
}
@VisibleForTesting
CompactSegments initializeCompactSegmentsDuty()
{
List<CompactSegments> compactSegmentsDutyFromCustomGroups = getCompactSegmentsDutyFromCustomGroups();
if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
return new CompactSegments(config, objectMapper, indexingServiceClient);
} else {
if (compactSegmentsDutyFromCustomGroups.size() > 1) {
log.warn("More than one compactSegments duty is configured in the Coordinator Custom Duty Group. The first duty will be picked up.");
}
return compactSegmentsDutyFromCustomGroups.get(0);
}
}
@VisibleForTesting
List<CompactSegments> getCompactSegmentsDutyFromCustomGroups()
{
return customDutyGroups.getCoordinatorCustomDutyGroups()
.stream()
.flatMap(coordinatorCustomDutyGroup -> coordinatorCustomDutyGroup.getCustomDutyList().stream())
.filter(duty -> duty instanceof CompactSegments)
.map(duty -> (CompactSegments) duty)
.collect(Collectors.toList());
}
private List<CoordinatorDuty> makeCompactSegmentsDuty()
{
return ImmutableList.of(compactSegments);

View File

@ -19,6 +19,8 @@
package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
@ -56,7 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CompactSegments implements CoordinatorDuty
public class CompactSegments implements CoordinatorCustomDuty
{
static final String COMPACTION_TASK_COUNT = "compactTaskCount";
static final String AVAILABLE_COMPACTION_TASK_SLOT = "availableCompactionTaskSlot";
@ -90,10 +92,11 @@ public class CompactSegments implements CoordinatorDuty
private final AtomicReference<Map<String, AutoCompactionSnapshot>> autoCompactionSnapshotPerDataSource = new AtomicReference<>();
@Inject
@JsonCreator
public CompactSegments(
DruidCoordinatorConfig config,
ObjectMapper objectMapper,
IndexingServiceClient indexingServiceClient
@JacksonInject DruidCoordinatorConfig config,
@JacksonInject ObjectMapper objectMapper,
@JacksonInject IndexingServiceClient indexingServiceClient
)
{
this.policy = new NewestSegmentFirstPolicy(objectMapper);
@ -104,6 +107,18 @@ public class CompactSegments implements CoordinatorDuty
LOG.info("Scheduling compaction with skipLockedIntervals [%s]", skipLockedIntervals);
}
@VisibleForTesting
public boolean isSkipLockedIntervals()
{
return skipLockedIntervals;
}
@VisibleForTesting
IndexingServiceClient getIndexingServiceClient()
{
return indexingServiceClient;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{

View File

@ -51,6 +51,7 @@ import org.apache.druid.initialization.DruidModule;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = "killSupervisors", value = KillSupervisorsCustomDuty.class),
@JsonSubTypes.Type(name = "compactSegments", value = CompactSegments.class),
})
@ExtensionPoint
public interface CoordinatorCustomDuty extends CoordinatorDuty

View File

@ -52,9 +52,12 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
@ -74,6 +77,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -735,7 +739,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory);
DruidCoordinator c = new DruidCoordinator(
null,
druidCoordinatorConfig,
null,
configManager,
null,
@ -786,6 +790,226 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertFalse(firstExec == thirdExec);
}
@Test
public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty()
{
CoordinatorCustomDutyGroups emptyCustomDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of());
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
new ZkPathsConfig()
{
@Override
public String getBase()
{
return "druid";
}
},
null,
segmentsMetadataManager,
serverInventoryView,
metadataRuleManager,
() -> curator,
serviceEmitter,
scheduledExecutorFactory,
null,
null,
new NoopServiceAnnouncer()
{
@Override
public void announce(DruidNode node)
{
// count down when this coordinator becomes the leader
leaderAnnouncerLatch.countDown();
}
@Override
public void unannounce(DruidNode node)
{
leaderUnannouncerLatch.countDown();
}
},
druidNode,
loadManagementPeons,
ImmutableSet.of(),
new HashSet<>(),
emptyCustomDutyGroups,
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
null,
ZkEnablementConfig.ENABLED
);
// Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties
List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
// CompactSegments should not exist in Custom Duty Group
List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
// CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
Assert.assertNotNull(duty);
Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
}
@Test
public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupDoesNotContainsCompactSegments()
{
CoordinatorCustomDutyGroup group = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(new KillSupervisorsCustomDuty(new Duration("PT1S"), null)));
CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(group));
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
new ZkPathsConfig()
{
@Override
public String getBase()
{
return "druid";
}
},
null,
segmentsMetadataManager,
serverInventoryView,
metadataRuleManager,
() -> curator,
serviceEmitter,
scheduledExecutorFactory,
null,
null,
new NoopServiceAnnouncer()
{
@Override
public void announce(DruidNode node)
{
// count down when this coordinator becomes the leader
leaderAnnouncerLatch.countDown();
}
@Override
public void unannounce(DruidNode node)
{
leaderUnannouncerLatch.countDown();
}
},
druidNode,
loadManagementPeons,
ImmutableSet.of(),
new HashSet<>(),
customDutyGroups,
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
null,
ZkEnablementConfig.ENABLED
);
// Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties
List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
// CompactSegments should not exist in Custom Duty Group
List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
// CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
Assert.assertNotNull(duty);
Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
}
@Test
public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupContainsCompactSegments()
{
DruidCoordinatorConfig differentConfigUsedInCustomGroup = new TestDruidCoordinatorConfig(
new Duration(COORDINATOR_START_DELAY),
new Duration(COORDINATOR_PERIOD),
null,
null,
null,
new Duration(COORDINATOR_PERIOD),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
10,
new Duration("PT0s"),
false
);
CoordinatorCustomDutyGroup compactSegmentCustomGroup = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(new CompactSegments(differentConfigUsedInCustomGroup, null, null)));
CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(compactSegmentCustomGroup));
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
new ZkPathsConfig()
{
@Override
public String getBase()
{
return "druid";
}
},
null,
segmentsMetadataManager,
serverInventoryView,
metadataRuleManager,
() -> curator,
serviceEmitter,
scheduledExecutorFactory,
null,
null,
new NoopServiceAnnouncer()
{
@Override
public void announce(DruidNode node)
{
// count down when this coordinator becomes the leader
leaderAnnouncerLatch.countDown();
}
@Override
public void unannounce(DruidNode node)
{
leaderUnannouncerLatch.countDown();
}
},
druidNode,
loadManagementPeons,
ImmutableSet.of(),
new HashSet<>(),
customDutyGroups,
new CostBalancerStrategyFactory(),
EasyMock.createNiceMock(LookupCoordinatorManager.class),
new TestDruidLeaderSelector(),
null,
ZkEnablementConfig.ENABLED
);
// Since CompactSegments is enabled in Custom Duty Group, then CompactSegments must not be created in IndexingServiceDuties
List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
Assert.assertTrue(indexingDuties.stream().noneMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
// CompactSegments should exist in Custom Duty Group
List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
Assert.assertFalse(compactSegmentsDutyFromCustomGroups.isEmpty());
Assert.assertEquals(1, compactSegmentsDutyFromCustomGroups.size());
Assert.assertNotNull(compactSegmentsDutyFromCustomGroups.get(0));
Assert.assertTrue(compactSegmentsDutyFromCustomGroups.get(0) instanceof CompactSegments);
// CompactSegments returned by this method should be from the Custom Duty Group
CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
Assert.assertNotNull(duty);
Assert.assertNotEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
// We should get the CompactSegment from the custom duty group which was created with a different config than the config in DruidCoordinator
Assert.assertEquals(differentConfigUsedInCustomGroup.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
}
@Test(timeout = 3000)
public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception
{

View File

@ -23,7 +23,6 @@ import org.joda.time.Duration;
public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
{
private final Duration coordinatorStartDelay;
private final Duration coordinatorPeriod;
private final Duration coordinatorIndexingPeriod;
@ -42,6 +41,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
private final Duration coordinatorDatasourceKillDurationToRetain;
private final Duration getLoadQueuePeonRepeatDelay;
private final int coordinatorKillMaxSegments;
private final boolean compactionSkipLockedIntervals;
public TestDruidCoordinatorConfig(
Duration coordinatorStartDelay,
@ -82,6 +82,50 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain;
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
this.compactionSkipLockedIntervals = true;
}
public TestDruidCoordinatorConfig(
Duration coordinatorStartDelay,
Duration coordinatorPeriod,
Duration coordinatorIndexingPeriod,
Duration metadataStoreManagementPeriod,
Duration loadTimeoutDelay,
Duration coordinatorKillPeriod,
Duration coordinatorKillDurationToRetain,
Duration coordinatorSupervisorKillPeriod,
Duration coordinatorSupervisorKillDurationToRetain,
Duration coordinatorAuditKillPeriod,
Duration coordinatorAuditKillDurationToRetain,
Duration coordinatorCompactionKillPeriod,
Duration coordinatorRuleKillPeriod,
Duration coordinatorRuleKillDurationToRetain,
Duration coordinatorDatasourceKillPeriod,
Duration coordinatorDatasourceKillDurationToRetain,
int coordinatorKillMaxSegments,
Duration getLoadQueuePeonRepeatDelay,
boolean compactionSkipLockedIntervals
)
{
this.coordinatorStartDelay = coordinatorStartDelay;
this.coordinatorPeriod = coordinatorPeriod;
this.coordinatorIndexingPeriod = coordinatorIndexingPeriod;
this.metadataStoreManagementPeriod = metadataStoreManagementPeriod;
this.loadTimeoutDelay = loadTimeoutDelay;
this.coordinatorKillPeriod = coordinatorKillPeriod;
this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain;
this.coordinatorSupervisorKillPeriod = coordinatorSupervisorKillPeriod;
this.coordinatorSupervisorKillDurationToRetain = coordinatorSupervisorKillDurationToRetain;
this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod;
this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain;
this.coordinatorCompactionKillPeriod = coordinatorCompactionKillPeriod;
this.coordinatorRuleKillPeriod = coordinatorRuleKillPeriod;
this.coordinatorRuleKillDurationToRetain = coordinatorRuleKillDurationToRetain;
this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain;
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
this.compactionSkipLockedIntervals = compactionSkipLockedIntervals;
}
@Override
@ -191,4 +235,10 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
{
return getLoadQueuePeonRepeatDelay;
}
@Override
public boolean getCompactionSkipLockedIntervals()
{
return compactionSkipLockedIntervals;
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@ -38,6 +39,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
@ -237,6 +239,28 @@ public class CompactSegmentsTest
);
}
@Test
public void testSerde() throws Exception
{
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
JSON_MAPPER.setInjectableValues(
new InjectableValues.Std()
.addValue(DruidCoordinatorConfig.class, COORDINATOR_CONFIG)
.addValue(ObjectMapper.class, JSON_MAPPER)
.addValue(IndexingServiceClient.class, indexingServiceClient)
);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
String compactSegmentString = JSON_MAPPER.writeValueAsString(compactSegments);
CompactSegments serdeCompactSegments = JSON_MAPPER.readValue(compactSegmentString, CompactSegments.class);
Assert.assertNotNull(serdeCompactSegments);
Assert.assertEquals(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals(), serdeCompactSegments.isSkipLockedIntervals());
Assert.assertEquals(indexingServiceClient, serdeCompactSegments.getIndexingServiceClient());
}
@Test
public void testRun()
{