Reduce coordinator logs when operating normally (#14926)

Changes:
- Reduce log level of some coordinator stats, which only denote normal coordinator operation.
These stats are still emitted and can be logged by setting debugDimensions in the coordinator
dynamic config.
- Initialize SegmentLoadingConfig only for historical management duties. This config is not
needed in other duties and initializing it creates logs which are misleading.
This commit is contained in:
Kashif Faraz 2023-08-30 11:30:38 +05:30 committed by GitHub
parent d201ea0ece
commit 8263f0d1e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 38 additions and 36 deletions

View File

@ -19,7 +19,6 @@
package org.apache.druid.server.coordinator; package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DataSourcesSnapshot;
@ -256,7 +255,6 @@ public class DruidCoordinatorRuntimeParams
public DruidCoordinatorRuntimeParams build() public DruidCoordinatorRuntimeParams build()
{ {
initStatsIfRequired(); initStatsIfRequired();
initSegmentLoadingConfigIfRequired();
initSegmentAssignerIfRequired(); initSegmentAssignerIfRequired();
return new DruidCoordinatorRuntimeParams( return new DruidCoordinatorRuntimeParams(
@ -282,15 +280,10 @@ public class DruidCoordinatorRuntimeParams
stats = stats == null ? new CoordinatorRunStats(debugDimensions) : stats; stats = stats == null ? new CoordinatorRunStats(debugDimensions) : stats;
} }
private void initSegmentLoadingConfigIfRequired() /**
{ * Initializes {@link StrategicSegmentAssigner} used by historical management
if (segmentLoadingConfig == null * duties for segment load/drop/move.
&& coordinatorDynamicConfig != null */
&& usedSegments != null) {
segmentLoadingConfig = SegmentLoadingConfig.create(coordinatorDynamicConfig, usedSegments.size());
}
}
private void initSegmentAssignerIfRequired() private void initSegmentAssignerIfRequired()
{ {
if (segmentAssigner != null || loadQueueManager == null) { if (segmentAssigner != null || loadQueueManager == null) {
@ -299,8 +292,13 @@ public class DruidCoordinatorRuntimeParams
Preconditions.checkNotNull(druidCluster); Preconditions.checkNotNull(druidCluster);
Preconditions.checkNotNull(balancerStrategy); Preconditions.checkNotNull(balancerStrategy);
Preconditions.checkNotNull(segmentLoadingConfig); Preconditions.checkNotNull(usedSegments);
Preconditions.checkNotNull(stats); Preconditions.checkNotNull(stats);
if (segmentLoadingConfig == null) {
segmentLoadingConfig = SegmentLoadingConfig.create(coordinatorDynamicConfig, usedSegments.size());
}
segmentAssigner = new StrategicSegmentAssigner( segmentAssigner = new StrategicSegmentAssigner(
loadQueueManager, loadQueueManager,
druidCluster, druidCluster,
@ -339,16 +337,12 @@ public class DruidCoordinatorRuntimeParams
return this; return this;
} }
/** This method must be used in test code only. */ public Builder withUsedSegments(DataSegment... usedSegments)
@VisibleForTesting
public Builder withUsedSegmentsInTest(DataSegment... usedSegments)
{ {
return withUsedSegmentsInTest(Arrays.asList(usedSegments)); return withUsedSegments(Arrays.asList(usedSegments));
} }
/** This method must be used in test code only. */ public Builder withUsedSegments(Collection<DataSegment> usedSegments)
@VisibleForTesting
public Builder withUsedSegmentsInTest(Collection<DataSegment> usedSegments)
{ {
this.usedSegments = createUsedSegmentsSet(usedSegments); this.usedSegments = createUsedSegmentsSet(usedSegments);
this.dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(usedSegments, ImmutableMap.of()); this.dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(usedSegments, ImmutableMap.of());
@ -361,6 +355,12 @@ public class DruidCoordinatorRuntimeParams
return this; return this;
} }
public Builder withSegmentLoadingConfig(SegmentLoadingConfig config)
{
this.segmentLoadingConfig = config;
return this;
}
public Builder withCompactionConfig(CoordinatorCompactionConfig config) public Builder withCompactionConfig(CoordinatorCompactionConfig config)
{ {
this.coordinatorCompactionConfig = config; this.coordinatorCompactionConfig = config;

View File

@ -84,7 +84,8 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty
taskMaster.resetPeonsForNewServers(currentServers); taskMaster.resetPeonsForNewServers(currentServers);
final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
final SegmentLoadingConfig segmentLoadingConfig = params.getSegmentLoadingConfig(); final SegmentLoadingConfig segmentLoadingConfig
= SegmentLoadingConfig.create(dynamicConfig, params.getUsedSegments().size());
final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers); final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
cancelLoadsOnDecommissioningServers(cluster); cancelLoadsOnDecommissioningServers(cluster);
@ -103,6 +104,7 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty
return params.buildFromExisting() return params.buildFromExisting()
.withDruidCluster(cluster) .withDruidCluster(cluster)
.withBalancerStrategy(balancerStrategy) .withBalancerStrategy(balancerStrategy)
.withSegmentLoadingConfig(segmentLoadingConfig)
.withSegmentAssignerUsing(loadQueueManager) .withSegmentAssignerUsing(loadQueueManager)
.build(); .build();
} }

View File

@ -28,9 +28,9 @@ public class Stats
{ {
// Decisions taken in a run // Decisions taken in a run
public static final CoordinatorStat ASSIGNED public static final CoordinatorStat ASSIGNED
= CoordinatorStat.toLogAndEmit("assigned", "segment/assigned/count", CoordinatorStat.Level.INFO); = CoordinatorStat.toDebugAndEmit("assigned", "segment/assigned/count");
public static final CoordinatorStat DROPPED public static final CoordinatorStat DROPPED
= CoordinatorStat.toLogAndEmit("dropped", "segment/dropped/count", CoordinatorStat.Level.INFO); = CoordinatorStat.toDebugAndEmit("dropped", "segment/dropped/count");
public static final CoordinatorStat DELETED public static final CoordinatorStat DELETED
= CoordinatorStat.toLogAndEmit("deleted", "segment/deleted/count", CoordinatorStat.Level.INFO); = CoordinatorStat.toLogAndEmit("deleted", "segment/deleted/count", CoordinatorStat.Level.INFO);
public static final CoordinatorStat MOVED public static final CoordinatorStat MOVED

View File

@ -130,7 +130,7 @@ public class BalanceSegmentsProfiler
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc()) .newBuilder(DateTimes.nowUtc())
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withUsedSegmentsInTest(segments) .withUsedSegments(segments)
.withDynamicConfigs( .withDynamicConfigs(
CoordinatorDynamicConfig CoordinatorDynamicConfig
.builder() .builder()
@ -185,7 +185,7 @@ public class BalanceSegmentsProfiler
) )
.build() .build()
) )
.withUsedSegmentsInTest(segments) .withUsedSegments(segments)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build()) .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withSegmentAssignerUsing(loadQueueManager) .withSegmentAssignerUsing(loadQueueManager)
.build(); .build();

View File

@ -43,6 +43,7 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.MetadataRuleManager;
@ -80,7 +81,6 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@ -165,7 +165,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
); );
loadQueuePeon.start(); loadQueuePeon.start();
druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
scheduledExecutorFactory = (corePoolSize, nameFormat) -> Executors.newSingleThreadScheduledExecutor(); scheduledExecutorFactory = ScheduledExecutors::fixed;
leaderAnnouncerLatch = new CountDownLatch(1); leaderAnnouncerLatch = new CountDownLatch(1);
leaderUnannouncerLatch = new CountDownLatch(1); leaderUnannouncerLatch = new CountDownLatch(1);
coordinator = new DruidCoordinator( coordinator = new DruidCoordinator(

View File

@ -395,7 +395,7 @@ public class BalanceSegmentsTest
return DruidCoordinatorRuntimeParams return DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc()) .newBuilder(DateTimes.nowUtc())
.withDruidCluster(DruidCluster.builder().addTier("normal", servers).build()) .withDruidCluster(DruidCluster.builder().addTier("normal", servers).build())
.withUsedSegmentsInTest(allSegments) .withUsedSegments(allSegments)
.withBroadcastDatasources(broadcastDatasources) .withBroadcastDatasources(broadcastDatasources)
.withBalancerStrategy(balancerStrategy) .withBalancerStrategy(balancerStrategy)
.withSegmentAssignerUsing(loadQueueManager); .withSegmentAssignerUsing(loadQueueManager);

View File

@ -48,7 +48,7 @@ public class CollectSegmentAndServerStatsTest
DruidCoordinatorRuntimeParams runtimeParams = DruidCoordinatorRuntimeParams runtimeParams =
DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc()) DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc())
.withDruidCluster(DruidCluster.EMPTY) .withDruidCluster(DruidCluster.EMPTY)
.withUsedSegmentsInTest() .withUsedSegments()
.withBalancerStrategy(new RandomBalancerStrategy()) .withBalancerStrategy(new RandomBalancerStrategy())
.withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null)) .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null))
.build(); .build();

View File

@ -337,7 +337,7 @@ public class RunRulesTest
return DruidCoordinatorRuntimeParams return DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc().minusDays(1)) .newBuilder(DateTimes.nowUtc().minusDays(1))
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withUsedSegmentsInTest(dataSegments) .withUsedSegments(dataSegments)
.withDatabaseRuleManager(databaseRuleManager); .withDatabaseRuleManager(databaseRuleManager);
} }
@ -830,7 +830,7 @@ public class RunRulesTest
stats = runDutyAndGetStats( stats = runDutyAndGetStats(
createCoordinatorRuntimeParams(druidCluster) createCoordinatorRuntimeParams(druidCluster)
.withUsedSegmentsInTest(overFlowSegment) .withUsedSegments(overFlowSegment)
.withBalancerStrategy(balancerStrategy) .withBalancerStrategy(balancerStrategy)
.withSegmentAssignerUsing(loadQueueManager) .withSegmentAssignerUsing(loadQueueManager)
.build() .build()
@ -950,7 +950,7 @@ public class RunRulesTest
.build(); .build();
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster) DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster)
.withUsedSegmentsInTest(longerUsedSegments) .withUsedSegments(longerUsedSegments)
.withBalancerStrategy(new CostBalancerStrategy(balancerExecutor)) .withBalancerStrategy(new CostBalancerStrategy(balancerExecutor))
.withSegmentAssignerUsing(loadQueueManager) .withSegmentAssignerUsing(loadQueueManager)
.build(); .build();
@ -1004,7 +1004,7 @@ public class RunRulesTest
).build(); ).build();
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster) DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster)
.withUsedSegmentsInTest(usedSegments) .withUsedSegments(usedSegments)
.withBalancerStrategy(new CostBalancerStrategy(balancerExecutor)) .withBalancerStrategy(new CostBalancerStrategy(balancerExecutor))
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
.withSegmentAssignerUsing(loadQueueManager) .withSegmentAssignerUsing(loadQueueManager)

View File

@ -259,7 +259,7 @@ public class UnloadUnusedSegmentsTest
.addRealtimes(new ServerHolder(indexerServer, indexerPeon, false)) .addRealtimes(new ServerHolder(indexerServer, indexerPeon, false))
.build() .build()
) )
.withUsedSegmentsInTest(usedSegments) .withUsedSegments(usedSegments)
.withBroadcastDatasources(Collections.singleton(broadcastDatasource)) .withBroadcastDatasources(Collections.singleton(broadcastDatasource))
.withDatabaseRuleManager(databaseRuleManager) .withDatabaseRuleManager(databaseRuleManager)
.build(); .build();

View File

@ -236,7 +236,7 @@ public class BroadcastDistributionRuleTest
return DruidCoordinatorRuntimeParams return DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc()) .newBuilder(DateTimes.nowUtc())
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withUsedSegmentsInTest(usedSegments) .withUsedSegments(usedSegments)
.withBalancerStrategy(new RandomBalancerStrategy()) .withBalancerStrategy(new RandomBalancerStrategy())
.withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null)) .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null))
.build(); .build();

View File

@ -145,7 +145,7 @@ public class LoadRuleTest
.newBuilder(DateTimes.nowUtc()) .newBuilder(DateTimes.nowUtc())
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withBalancerStrategy(balancerStrategy) .withBalancerStrategy(balancerStrategy)
.withUsedSegmentsInTest(usedSegments) .withUsedSegments(usedSegments)
.withDynamicConfigs( .withDynamicConfigs(
CoordinatorDynamicConfig.builder() CoordinatorDynamicConfig.builder()
.withSmartSegmentLoading(false) .withSmartSegmentLoading(false)
@ -335,7 +335,7 @@ public class LoadRuleTest
.newBuilder(DateTimes.nowUtc()) .newBuilder(DateTimes.nowUtc())
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withBalancerStrategy(balancerStrategy) .withBalancerStrategy(balancerStrategy)
.withUsedSegmentsInTest(dataSegment1, dataSegment2, dataSegment3) .withUsedSegments(dataSegment1, dataSegment2, dataSegment3)
.withDynamicConfigs( .withDynamicConfigs(
CoordinatorDynamicConfig.builder() CoordinatorDynamicConfig.builder()
.withSmartSegmentLoading(false) .withSmartSegmentLoading(false)