Use num cores to determine balancerComputeThreads (#14902)

Changes:
- Determine the default value of balancerComputeThreads based on number of
coordinator cpus rather than number of segments. Even if the number of segments
is low and we create more balancer threads, it doesn't hurt the system as threads
would mostly be idle.
- Remove unused field from SegmentLoadQueueManager

Expected values:
- Clusters with ~1M segments typically work with Coordinators having 16 cores or more.
This would give us 8 balancer threads, which is the same as the current maximum.
- On small clusters, even a single thread is enough to do the required balancing work.
This commit is contained in:
Kashif Faraz 2023-08-25 08:15:27 +05:30 committed by GitHub
parent 388d5ecf78
commit e51181957c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 60 additions and 179 deletions

View File

@ -949,27 +949,27 @@ A sample Coordinator dynamic config JSON object is shown below:
Issuing a GET request at the same URL will return the spec that is currently in place. A description of the config setup spec is shown below. Issuing a GET request at the same URL will return the spec that is currently in place. A description of the config setup spec is shown below.
|Property| Description | Default | |Property|Description|Default|
|--------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------| |--------|-----------|-------|
|`millisToWaitBeforeDeleting`| How long does the Coordinator need to be a leader before it can start marking overshadowed segments as unused in metadata storage. | 900000 (15 mins) | |`millisToWaitBeforeDeleting`|How long does the Coordinator need to be a leader before it can start marking overshadowed segments as unused in metadata storage.| 900000 (15 mins)|
|`mergeBytesLimit`| The maximum total uncompressed size in bytes of segments to merge. | 524288000L | |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L|
|`mergeSegmentsLimit`| The maximum number of segments that can be in a single [append task](../ingestion/tasks.md). | 100 | |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100|
|`smartSegmentLoading`| Enables ["smart" segment loading mode](#smart-segment-loading) which dynamically computes the optimal values of several properties that maximize Coordinator performance. | true | |`smartSegmentLoading`|Enables ["smart" segment loading mode](#smart-segment-loading) which dynamically computes the optimal values of several properties that maximize Coordinator performance.|true|
|`maxSegmentsToMove`| The maximum number of segments that can be moved at any given time. | 100 | |`maxSegmentsToMove`|The maximum number of segments that can be moved in a Historical tier at any given time.|100|
|`replicantLifetime`| The maximum number of Coordinator runs for which a segment can wait in the load queue of a Historical before Druid raises an alert. | 15 | |`replicantLifetime`|The maximum number of Coordinator runs for which a segment can wait in the load queue of a Historical before Druid raises an alert.|15|
|`replicationThrottleLimit`| The maximum number of segment replicas that can be assigned to a historical tier in a single Coordinator run. This property prevents historicals from becoming overwhelmed when loading extra replicas of segments that are already available in the cluster. | 500 | |`replicationThrottleLimit`|The maximum number of segment replicas that can be assigned to a historical tier in a single Coordinator run. This property prevents historicals from becoming overwhelmed when loading extra replicas of segments that are already available in the cluster.|500|
|`balancerComputeThreads`| Thread pool size for computing moving cost of segments during segment balancing. Consider increasing this if you have a lot of segments and moving segments begins to stall. | 1 | |`balancerComputeThreads`|Thread pool size for computing moving cost of segments during segment balancing. Consider increasing this if you have a lot of segments and moving segments begins to stall.|`num_cores` / 2|
|`killDataSourceWhitelist`| List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array. | none | |`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none|
|`killTaskSlotRatio`| Ratio of total available task slots, including autoscaling if applicable that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty, which is enabled when `druid.coordinator.kill.on` is true. | 1 - all task slots can be used | |`killTaskSlotRatio`|Ratio of total available task slots, including autoscaling if applicable that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty, which is enabled when `druid.coordinator.kill.on` is true.| 1 - all task slots can be used|
|`maxKillTaskSlots`| Maximum number of tasks that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty, which is enabled when `druid.coordinator.kill.on` is true. | 2147483647 - no limit | |`maxKillTaskSlots`|Maximum number of tasks that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty, which is enabled when `druid.coordinator.kill.on` is true.|`Integer.MAX_VALUE` - no limit|
|`killPendingSegmentsSkipList`| List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array. | none | |`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none|
|`maxSegmentsInNodeLoadingQueue`| The maximum number of segments allowed in the load queue of any given server. Use this parameter to load segments faster if, for example, the cluster contains slow-loading nodes or if there are too many segments to be replicated to a particular node (when faster loading is preferred to better segments distribution). The optimal value depends on the loading speed of segments, acceptable replication time and number of nodes. | 500 | |`maxSegmentsInNodeLoadingQueue`|The maximum number of segments allowed in the load queue of any given server. Use this parameter to load segments faster if, for example, the cluster contains slow-loading nodes or if there are too many segments to be replicated to a particular node (when faster loading is preferred to better segments distribution). The optimal value depends on the loading speed of segments, acceptable replication time and number of nodes.|500|
|`useRoundRobinSegmentAssignment`| Boolean flag for whether segments should be assigned to historicals in a round robin fashion. When disabled, segment assignment is done using the chosen balancer strategy. When enabled, this can speed up segment assignments leaving balancing to move the segments to their optimal locations (based on the balancer strategy) lazily. | true | |`useRoundRobinSegmentAssignment`|Boolean flag for whether segments should be assigned to historicals in a round robin fashion. When disabled, segment assignment is done using the chosen balancer strategy. When enabled, this can speed up segment assignments leaving balancing to move the segments to their optimal locations (based on the balancer strategy) lazily.|true|
|`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`. | none | |`decommissioningNodes`|List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
|`decommissioningMaxPercentOfMaxSegmentsToMove`| Upper limit of segments the Coordinator can move from decommissioning servers to active non-decommissioning servers during a single run. This value is relative to the total maximum number of segments that can be moved at any given time based upon the value of `maxSegmentsToMove`.<br /><br />If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, the Coordinator does not move segments to decommissioning servers, effectively putting them in a type of "maintenance" mode. In this case, decommissioning servers do not participate in balancing or assignment by load rules. The Coordinator still considers segments on decommissioning servers as candidates to replicate on active servers.<br /><br />Decommissioning can stall if there are no available active servers to move the segments to. You can use the maximum percent of decommissioning segment movements to prioritize balancing or to decrease commissioning time to prevent active servers from being overloaded. The value must be between 0 and 100. | 70 | |`decommissioningMaxPercentOfMaxSegmentsToMove`|Upper limit of segments the Coordinator can move from decommissioning servers to active non-decommissioning servers during a single run. This value is relative to the total maximum number of segments that can be moved at any given time based upon the value of `maxSegmentsToMove`.<br /><br />If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, the Coordinator does not move segments to decommissioning servers, effectively putting them in a type of "maintenance" mode. In this case, decommissioning servers do not participate in balancing or assignment by load rules. The Coordinator still considers segments on decommissioning servers as candidates to replicate on active servers.<br /><br />Decommissioning can stall if there are no available active servers to move the segments to. You can use the maximum percent of decommissioning segment movements to prioritize balancing or to decrease commissioning time to prevent active servers from being overloaded. The value must be between 0 and 100.|70|
|`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. | false | |`pauseCoordination`|Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again.|false|
|`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated. | false | |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false|
|`maxNonPrimaryReplicantsToLoad`| The maximum number of replicas that can be assigned across all tiers in a single Coordinator run. This parameter serves the same purpose as `replicationThrottleLimit` except this limit applies at the cluster-level instead of per tier. The default value does not apply a limit to the number of replicas assigned per coordination cycle. If you want to use a non-default value for this property, you may want to start with `~20%` of the number of segments found on the historical server with the most segments. Use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this property impact your Coordinator execution time. | `Integer.MAX_VALUE` (no limit) | |`maxNonPrimaryReplicantsToLoad`|The maximum number of replicas that can be assigned across all tiers in a single Coordinator run. This parameter serves the same purpose as `replicationThrottleLimit` except this limit applies at the cluster-level instead of per tier. The default value does not apply a limit to the number of replicas assigned per coordination cycle. If you want to use a non-default value for this property, you may want to start with `~20%` of the number of segments found on the historical server with the most segments. Use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this property impact your Coordinator execution time.|`Integer.MAX_VALUE` (no limit)|
##### Smart segment loading ##### Smart segment loading
@ -989,6 +989,7 @@ Druid computes the values to optimize Coordinator performance, based on the curr
|`maxNonPrimaryReplicantsToLoad`|`Integer.MAX_VALUE` (no limit)|This throttling is already handled by `replicationThrottleLimit`.| |`maxNonPrimaryReplicantsToLoad`|`Integer.MAX_VALUE` (no limit)|This throttling is already handled by `replicationThrottleLimit`.|
|`maxSegmentsToMove`|2% of used segments, minimum value 100, maximum value 1000|Ensures that some segments are always moving in the cluster to keep it well balanced. The maximum value keeps the Coordinator run times bounded.| |`maxSegmentsToMove`|2% of used segments, minimum value 100, maximum value 1000|Ensures that some segments are always moving in the cluster to keep it well balanced. The maximum value keeps the Coordinator run times bounded.|
|`decommissioningMaxPercentOfMaxSegmentsToMove`|100|Prioritizes the move of segments from decommissioning servers so that they can be terminated quickly.| |`decommissioningMaxPercentOfMaxSegmentsToMove`|100|Prioritizes the move of segments from decommissioning servers so that they can be terminated quickly.|
|`balancerComputeThreads`|`num_cores` / 2|Ensures that there are enough threads to perform balancing computations without hogging all Coordinator resources.|
When `smartSegmentLoading` is disabled, Druid uses the configured values of these properties. When `smartSegmentLoading` is disabled, Druid uses the configured values of these properties.
Disable `smartSegmentLoading` only if you want to explicitly set the values of any of the above properties. Disable `smartSegmentLoading` only if you want to explicitly set the values of any of the above properties.

View File

@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon; import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -516,6 +517,15 @@ public class CoordinatorDynamicConfig
return new Builder(); return new Builder();
} }
/**
* Returns a value of {@code (num processors / 2)} to ensure that balancing
* computations do not hog all Coordinator resources.
*/
public static int getDefaultBalancerComputeThreads()
{
return Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() / 2);
}
private static class Defaults private static class Defaults
{ {
static final long LEADING_MILLIS_BEFORE_MARK_UNUSED = TimeUnit.MINUTES.toMillis(15); static final long LEADING_MILLIS_BEFORE_MARK_UNUSED = TimeUnit.MINUTES.toMillis(15);
@ -524,8 +534,6 @@ public class CoordinatorDynamicConfig
static final int MAX_SEGMENTS_TO_MOVE = 100; static final int MAX_SEGMENTS_TO_MOVE = 100;
static final int REPLICANT_LIFETIME = 15; static final int REPLICANT_LIFETIME = 15;
static final int REPLICATION_THROTTLE_LIMIT = 500; static final int REPLICATION_THROTTLE_LIMIT = 500;
static final int BALANCER_COMPUTE_THREADS = 1;
static final boolean EMIT_BALANCING_STATS = false;
static final int MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 500; static final int MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 500;
static final int DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70; static final int DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
static final boolean PAUSE_COORDINATION = false; static final boolean PAUSE_COORDINATION = false;
@ -746,7 +754,7 @@ public class CoordinatorDynamicConfig
valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE), valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE),
valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME), valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME),
valueOrDefault(replicationThrottleLimit, Defaults.REPLICATION_THROTTLE_LIMIT), valueOrDefault(replicationThrottleLimit, Defaults.REPLICATION_THROTTLE_LIMIT),
valueOrDefault(balancerComputeThreads, Defaults.BALANCER_COMPUTE_THREADS), valueOrDefault(balancerComputeThreads, getDefaultBalancerComputeThreads()),
specificDataSourcesToKillUnusedSegmentsIn, specificDataSourcesToKillUnusedSegmentsIn,
valueOrDefault(killTaskSlotRatio, Defaults.KILL_TASK_SLOT_RATIO), valueOrDefault(killTaskSlotRatio, Defaults.KILL_TASK_SLOT_RATIO),
valueOrDefault(maxKillTaskSlots, Defaults.MAX_KILL_TASK_SLOTS), valueOrDefault(maxKillTaskSlots, Defaults.MAX_KILL_TASK_SLOTS),

View File

@ -22,7 +22,6 @@ package org.apache.druid.server.coordinator.loading;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.client.ServerInventoryView; import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
@ -36,17 +35,14 @@ public class SegmentLoadQueueManager
private final LoadQueueTaskMaster taskMaster; private final LoadQueueTaskMaster taskMaster;
private final ServerInventoryView serverInventoryView; private final ServerInventoryView serverInventoryView;
private final SegmentsMetadataManager segmentsMetadataManager;
@Inject @Inject
public SegmentLoadQueueManager( public SegmentLoadQueueManager(
ServerInventoryView serverInventoryView, ServerInventoryView serverInventoryView,
SegmentsMetadataManager segmentsMetadataManager,
LoadQueueTaskMaster taskMaster LoadQueueTaskMaster taskMaster
) )
{ {
this.serverInventoryView = serverInventoryView; this.serverInventoryView = serverInventoryView;
this.segmentsMetadataManager = segmentsMetadataManager;
this.taskMaster = taskMaster; this.taskMaster = taskMaster;
} }
@ -148,12 +144,4 @@ public class SegmentLoadQueueManager
return true; return true;
} }
/**
* Marks the given segment as unused.
*/
public boolean deleteSegment(DataSegment segment)
{
return segmentsMetadataManager.markSegmentAsUnused(segment.getId());
}
} }

View File

@ -50,12 +50,9 @@ public class SegmentLoadingConfig
// Compute replicationThrottleLimit with a lower bound of 100 // Compute replicationThrottleLimit with a lower bound of 100
final int throttlePercentage = 2; final int throttlePercentage = 2;
final int replicationThrottleLimit = Math.max(100, numUsedSegments * throttlePercentage / 100); final int replicationThrottleLimit = Math.max(100, numUsedSegments * throttlePercentage / 100);
final int balancerComputeThreads = computeNumBalancerThreads(numUsedSegments);
log.info( log.info(
"Smart segment loading is enabled. Calculated balancerComputeThreads[%d]" "Smart segment loading is enabled. Calculated replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).",
+ " and replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).", replicationThrottleLimit, throttlePercentage, numUsedSegments
balancerComputeThreads, replicationThrottleLimit, throttlePercentage, numUsedSegments
); );
return new SegmentLoadingConfig( return new SegmentLoadingConfig(
@ -64,7 +61,7 @@ public class SegmentLoadingConfig
Integer.MAX_VALUE, Integer.MAX_VALUE,
60, 60,
true, true,
balancerComputeThreads CoordinatorDynamicConfig.getDefaultBalancerComputeThreads()
); );
} else { } else {
// Use the configured values // Use the configured values
@ -125,31 +122,4 @@ public class SegmentLoadingConfig
{ {
return balancerComputeThreads; return balancerComputeThreads;
} }
/**
* Computes the number of threads to be used in the balancing executor.
* The number of used segments in a cluster is generally a good indicator of
* the cluster size and has been used here as a proxy for the actual number of
* segments that would be involved in cost computations.
* <p>
* The number of threads increases by 1 first for every 50k segments, then for
* every 75k segments and so on.
*
* @return Number of {@code balancerComputeThreads} in the range [1, 8].
*/
public static int computeNumBalancerThreads(int numUsedSegments)
{
// Add an extra thread when numUsedSegments increases by a step
final int[] stepValues = {50, 50, 75, 75, 100, 100, 150, 150};
int remainder = numUsedSegments / 1000;
for (int step = 0; step < stepValues.length; ++step) {
remainder -= stepValues[step];
if (remainder < 0) {
return step + 1;
}
}
return stepValues.length;
}
} }

View File

@ -67,7 +67,7 @@ public class BalanceSegmentsProfiler
@Before @Before
public void setUp() public void setUp()
{ {
loadQueueManager = new SegmentLoadQueueManager(null, null, null); loadQueueManager = new SegmentLoadQueueManager(null, null);
druidServer1 = EasyMock.createMock(ImmutableDruidServer.class); druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer2 = EasyMock.createMock(ImmutableDruidServer.class); druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
emitter = EasyMock.createMock(ServiceEmitter.class); emitter = EasyMock.createMock(ServiceEmitter.class);

View File

@ -39,7 +39,6 @@ import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
@ -81,7 +80,6 @@ import java.util.concurrent.TimeUnit;
*/ */
public class CuratorDruidCoordinatorTest extends CuratorTestBase public class CuratorDruidCoordinatorTest extends CuratorTestBase
{ {
private SegmentsMetadataManager segmentsMetadataManager;
private DataSourcesSnapshot dataSourcesSnapshot; private DataSourcesSnapshot dataSourcesSnapshot;
private DruidCoordinatorRuntimeParams coordinatorRuntimeParams; private DruidCoordinatorRuntimeParams coordinatorRuntimeParams;
@ -121,7 +119,6 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class); dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
coordinatorRuntimeParams = EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class); coordinatorRuntimeParams = EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class);
@ -294,8 +291,6 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class); ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(sourceSegments.get(2)); EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(sourceSegments.get(2));
EasyMock.replay(druidDataSource); EasyMock.replay(druidDataSource);
EasyMock.expect(segmentsMetadataManager.getImmutableDataSourceWithUsedSegments(EasyMock.anyString()))
.andReturn(druidDataSource);
EasyMock.expect(coordinatorRuntimeParams.getDataSourcesSnapshot()) EasyMock.expect(coordinatorRuntimeParams.getDataSourcesSnapshot())
.andReturn(dataSourcesSnapshot).anyTimes(); .andReturn(dataSourcesSnapshot).anyTimes();
final CoordinatorDynamicConfig dynamicConfig = final CoordinatorDynamicConfig dynamicConfig =
@ -322,7 +317,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
EasyMock.expect(coordinatorRuntimeParams.getBalancerStrategy()) EasyMock.expect(coordinatorRuntimeParams.getBalancerStrategy())
.andReturn(balancerStrategy).anyTimes(); .andReturn(balancerStrategy).anyTimes();
EasyMock.expect(coordinatorRuntimeParams.getDruidCluster()).andReturn(cluster).anyTimes(); EasyMock.expect(coordinatorRuntimeParams.getDruidCluster()).andReturn(cluster).anyTimes();
EasyMock.replay(segmentsMetadataManager, coordinatorRuntimeParams, balancerStrategy); EasyMock.replay(coordinatorRuntimeParams, balancerStrategy);
EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())) EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString()))
.andReturn(druidDataSource).anyTimes(); .andReturn(druidDataSource).anyTimes();
@ -334,7 +329,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
// Move the segment from source to dest // Move the segment from source to dest
SegmentLoadQueueManager loadQueueManager = SegmentLoadQueueManager loadQueueManager =
new SegmentLoadQueueManager(baseView, segmentsMetadataManager, taskMaster); new SegmentLoadQueueManager(baseView, taskMaster);
StrategicSegmentAssigner segmentAssigner = createSegmentAssigner(loadQueueManager, coordinatorRuntimeParams); StrategicSegmentAssigner segmentAssigner = createSegmentAssigner(loadQueueManager, coordinatorRuntimeParams);
segmentAssigner.moveSegment( segmentAssigner.moveSegment(
segmentToMove, segmentToMove,

View File

@ -178,7 +178,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
scheduledExecutorFactory, scheduledExecutorFactory,
null, null,
loadQueueTaskMaster, loadQueueTaskMaster,
new SegmentLoadQueueManager(serverInventoryView, segmentsMetadataManager, loadQueueTaskMaster), new SegmentLoadQueueManager(serverInventoryView, loadQueueTaskMaster),
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch), new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
druidNode, druidNode,
new HashSet<>(), new HashSet<>(),
@ -789,7 +789,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
scheduledExecutorFactory, scheduledExecutorFactory,
null, null,
loadQueueTaskMaster, loadQueueTaskMaster,
new SegmentLoadQueueManager(serverInventoryView, segmentsMetadataManager, loadQueueTaskMaster), new SegmentLoadQueueManager(serverInventoryView, loadQueueTaskMaster),
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch), new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
druidNode, druidNode,
new HashSet<>(), new HashSet<>(),

View File

@ -24,7 +24,6 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration; import org.joda.time.Duration;
@ -112,21 +111,6 @@ public class SegmentToMoveCalculatorTest
Assert.assertEquals(1_000, computeMaxSegmentsToMove(10_000_000, 8)); Assert.assertEquals(1_000, computeMaxSegmentsToMove(10_000_000, 8));
} }
@Test
public void testMaxSegmentsToMoveWithComputedNumThreads()
{
Assert.assertEquals(1_900, computeNumThreadsAndMaxToMove(10_000));
Assert.assertEquals(9_700, computeNumThreadsAndMaxToMove(50_000));
Assert.assertEquals(19_500, computeNumThreadsAndMaxToMove(100_000));
Assert.assertEquals(39_000, computeNumThreadsAndMaxToMove(200_000));
Assert.assertEquals(29_000, computeNumThreadsAndMaxToMove(500_000));
Assert.assertEquals(16_000, computeNumThreadsAndMaxToMove(1_000_000));
Assert.assertEquals(8_000, computeNumThreadsAndMaxToMove(2_000_000));
Assert.assertEquals(1_000, computeNumThreadsAndMaxToMove(10_000_000));
}
@Test @Test
public void testMinSegmentsToMove() public void testMinSegmentsToMove()
{ {
@ -237,15 +221,6 @@ public class SegmentToMoveCalculatorTest
return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(totalSegments, 1, coordinatorPeriod); return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(totalSegments, 1, coordinatorPeriod);
} }
private static int computeNumThreadsAndMaxToMove(int totalSegments)
{
return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(
totalSegments,
SegmentLoadingConfig.computeNumBalancerThreads(totalSegments),
DEFAULT_COORDINATOR_PERIOD
);
}
private static int computeMinSegmentsToMove(int totalSegmentsInTier) private static int computeMinSegmentsToMove(int totalSegmentsInTier)
{ {
return SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(totalSegmentsInTier); return SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(totalSegmentsInTier);

View File

@ -72,7 +72,7 @@ public class BalanceSegmentsTest
@Before @Before
public void setUp() public void setUp()
{ {
loadQueueManager = new SegmentLoadQueueManager(null, null, null); loadQueueManager = new SegmentLoadQueueManager(null, null);
// Create test segments for multiple datasources // Create test segments for multiple datasources
final DateTime start1 = DateTimes.of("2012-01-01"); final DateTime start1 = DateTimes.of("2012-01-01");

View File

@ -50,7 +50,7 @@ public class CollectSegmentAndServerStatsTest
.withDruidCluster(DruidCluster.EMPTY) .withDruidCluster(DruidCluster.EMPTY)
.withUsedSegmentsInTest() .withUsedSegmentsInTest()
.withBalancerStrategy(new RandomBalancerStrategy()) .withBalancerStrategy(new RandomBalancerStrategy())
.withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, null)) .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null))
.build(); .build();
Mockito.when(mockTaskMaster.getAllPeons()) Mockito.when(mockTaskMaster.getAllPeons())

View File

@ -99,7 +99,7 @@ public class MarkOvershadowedSegmentsAsUnusedTest
CoordinatorDynamicConfig.builder().withMarkSegmentAsUnusedDelayMillis(0).build() CoordinatorDynamicConfig.builder().withMarkSegmentAsUnusedDelayMillis(0).build()
) )
.withBalancerStrategy(new RandomBalancerStrategy()) .withBalancerStrategy(new RandomBalancerStrategy())
.withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, null)) .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null))
.build(); .build();
SegmentTimeline timeline = segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments() SegmentTimeline timeline = segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()

View File

@ -33,7 +33,6 @@ import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.AlertEvent; import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexIO;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@ -83,7 +82,6 @@ public class RunRulesTest
private RunRules ruleRunner; private RunRules ruleRunner;
private StubServiceEmitter emitter; private StubServiceEmitter emitter;
private MetadataRuleManager databaseRuleManager; private MetadataRuleManager databaseRuleManager;
private SegmentsMetadataManager segmentsMetadataManager;
private SegmentLoadQueueManager loadQueueManager; private SegmentLoadQueueManager loadQueueManager;
private final List<DataSegment> usedSegments = private final List<DataSegment> usedSegments =
CreateDataSegments.ofDatasource(DATASOURCE) CreateDataSegments.ofDatasource(DATASOURCE)
@ -101,9 +99,8 @@ public class RunRulesTest
emitter = new StubServiceEmitter("coordinator", "host"); emitter = new StubServiceEmitter("coordinator", "host");
EmittingLogger.registerEmitter(emitter); EmittingLogger.registerEmitter(emitter);
databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
ruleRunner = new RunRules(Set::size); ruleRunner = new RunRules(Set::size);
loadQueueManager = new SegmentLoadQueueManager(null, segmentsMetadataManager, null); loadQueueManager = new SegmentLoadQueueManager(null, null);
balancerExecutor = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d")); balancerExecutor = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
} }
@ -535,10 +532,6 @@ public class RunRulesTest
EasyMock.expectLastCall().atLeastOnce(); EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon(); mockEmptyPeon();
EasyMock.expect(segmentsMetadataManager.markSegmentAsUnused(EasyMock.anyObject()))
.andReturn(true).anyTimes();
EasyMock.replay(segmentsMetadataManager);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Lists.newArrayList( Lists.newArrayList(
new IntervalLoadRule( new IntervalLoadRule(
@ -587,7 +580,7 @@ public class RunRulesTest
new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z")) new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
) )
).atLeastOnce(); ).atLeastOnce();
EasyMock.replay(databaseRuleManager, segmentsMetadataManager); EasyMock.replay(databaseRuleManager);
DruidServer server1 = createHistorical("serverNorm", "normal"); DruidServer server1 = createHistorical("serverNorm", "normal");
server1.addDataSegment(usedSegments.get(0)); server1.addDataSegment(usedSegments.get(0));
@ -644,7 +637,7 @@ public class RunRulesTest
new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z")) new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
) )
).atLeastOnce(); ).atLeastOnce();
EasyMock.replay(databaseRuleManager, segmentsMetadataManager); EasyMock.replay(databaseRuleManager);
DruidServer server1 = createHistorical("server1", "hot"); DruidServer server1 = createHistorical("server1", "hot");
server1.addDataSegment(usedSegments.get(0)); server1.addDataSegment(usedSegments.get(0));
@ -688,7 +681,7 @@ public class RunRulesTest
new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z")) new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
) )
).atLeastOnce(); ).atLeastOnce();
EasyMock.replay(databaseRuleManager, segmentsMetadataManager); EasyMock.replay(databaseRuleManager);
DruidServer server1 = createHistorical("server1", "hot"); DruidServer server1 = createHistorical("server1", "hot");
DruidServer server2 = createHistorical("serverNorm2", "normal"); DruidServer server2 = createHistorical("serverNorm2", "normal");
@ -856,9 +849,6 @@ public class RunRulesTest
@Test @Test
public void testReplicantThrottleAcrossTiers() public void testReplicantThrottleAcrossTiers()
{ {
EasyMock.expect(segmentsMetadataManager.markSegmentAsUnused(EasyMock.anyObject()))
.andReturn(true).anyTimes();
EasyMock.replay(segmentsMetadataManager);
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce(); EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon(); mockEmptyPeon();

View File

@ -84,7 +84,7 @@ public class UnloadUnusedSegmentsTest
brokerServer = EasyMock.createMock(ImmutableDruidServer.class); brokerServer = EasyMock.createMock(ImmutableDruidServer.class);
indexerServer = EasyMock.createMock(ImmutableDruidServer.class); indexerServer = EasyMock.createMock(ImmutableDruidServer.class);
databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
loadQueueManager = new SegmentLoadQueueManager(null, null, null); loadQueueManager = new SegmentLoadQueueManager(null, null);
DateTime start1 = DateTimes.of("2012-01-01"); DateTime start1 = DateTimes.of("2012-01-01");
DateTime start2 = DateTimes.of("2012-02-01"); DateTime start2 = DateTimes.of("2012-02-01");

View File

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.loading;
import org.junit.Assert;
import org.junit.Test;
public class SegmentLoadingConfigTest
{
@Test
public void testComputeNumBalancerThreads()
{
Assert.assertEquals(1, computeBalancerThreads(0));
Assert.assertEquals(1, computeBalancerThreads(30_000));
Assert.assertEquals(2, computeBalancerThreads(50_000));
Assert.assertEquals(3, computeBalancerThreads(100_000));
Assert.assertEquals(4, computeBalancerThreads(175_000));
Assert.assertEquals(5, computeBalancerThreads(250_000));
Assert.assertEquals(6, computeBalancerThreads(350_000));
Assert.assertEquals(7, computeBalancerThreads(450_000));
Assert.assertEquals(8, computeBalancerThreads(600_000));
Assert.assertEquals(8, computeBalancerThreads(1_000_000));
Assert.assertEquals(8, computeBalancerThreads(10_000_000));
}
private int computeBalancerThreads(int numUsedSegments)
{
return SegmentLoadingConfig.computeNumBalancerThreads(numUsedSegments);
}
}

View File

@ -238,7 +238,7 @@ public class BroadcastDistributionRuleTest
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)
.withUsedSegmentsInTest(usedSegments) .withUsedSegmentsInTest(usedSegments)
.withBalancerStrategy(new RandomBalancerStrategy()) .withBalancerStrategy(new RandomBalancerStrategy())
.withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, null)) .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null))
.build(); .build();
} }

View File

@ -91,7 +91,7 @@ public class LoadRuleTest
{ {
exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "LoadRuleTest-%d")); exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "LoadRuleTest-%d"));
balancerStrategy = new CostBalancerStrategy(exec); balancerStrategy = new CostBalancerStrategy(exec);
loadQueueManager = new SegmentLoadQueueManager(null, null, null); loadQueueManager = new SegmentLoadQueueManager(null, null);
} }
@After @After

View File

@ -489,7 +489,7 @@ public class CoordinatorSimulationBuilder
null null
); );
this.loadQueueManager = this.loadQueueManager =
new SegmentLoadQueueManager(coordinatorInventoryView, segmentManager, loadQueueTaskMaster); new SegmentLoadQueueManager(coordinatorInventoryView, loadQueueTaskMaster);
this.jacksonConfigManager = mockConfigManager(); this.jacksonConfigManager = mockConfigManager();
setDynamicConfig(dynamicConfig); setDynamicConfig(dynamicConfig);

View File

@ -24,11 +24,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.utils.JvmUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Set; import java.util.Set;
/** /**
@ -631,7 +631,7 @@ public class CoordinatorDynamicConfigTest
100, 100,
15, 15,
500, 500,
1, getDefaultNumBalancerThreads(),
emptyList, emptyList,
1.0, 1.0,
Integer.MAX_VALUE, Integer.MAX_VALUE,
@ -661,7 +661,7 @@ public class CoordinatorDynamicConfigTest
100, 100,
15, 15,
500, 500,
1, getDefaultNumBalancerThreads(),
ImmutableSet.of("DATASOURCE"), ImmutableSet.of("DATASOURCE"),
1.0, 1.0,
Integer.MAX_VALUE, Integer.MAX_VALUE,
@ -792,4 +792,9 @@ public class CoordinatorDynamicConfigTest
Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout()); Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout());
Assert.assertEquals(maxNonPrimaryReplicantsToLoad, config.getMaxNonPrimaryReplicantsToLoad()); Assert.assertEquals(maxNonPrimaryReplicantsToLoad, config.getMaxNonPrimaryReplicantsToLoad());
} }
private static int getDefaultNumBalancerThreads()
{
return Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() / 2);
}
} }