Merge pull request #661 from metamx/drop-remove-stuff

Coordinator drop/remove related stuff.
This commit is contained in:
fjy 2014-08-06 14:28:23 -06:00
commit 8b6f030ad5
3 changed files with 27 additions and 25 deletions

View File

@ -51,7 +51,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|Property|Description|Default| |Property|Description|Default|
|--------|-----------|-------| |--------|-----------|-------|
|`millisToWaitBeforeDeleting`|How long does the coordinator need to be active before it can start deleting segments.|90000 (15 mins)| |`millisToWaitBeforeDeleting`|How long does the coordinator need to be active before it can start removing (marking unused) segments in metadata storage.|900000 (15 mins)|
|`mergeBytesLimit`|The maximum number of bytes to merge (for segments).|524288000L| |`mergeBytesLimit`|The maximum number of bytes to merge (for segments).|524288000L|
|`mergeSegmentsLimit`|The maximum number of segments that can be in a single [merge task](Tasks.html).|100| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [merge task](Tasks.html).|100|
|`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5|

View File

@ -63,27 +63,31 @@ public class DruidCoordinatorCleanup implements DruidCoordinatorHelper
Set<DataSegment> availableSegments = params.getAvailableSegments(); Set<DataSegment> availableSegments = params.getAvailableSegments();
DruidCluster cluster = params.getDruidCluster(); DruidCluster cluster = params.getDruidCluster();
// Drop segments that no longer exist in the available segments configuration // Drop segments that no longer exist in the available segments configuration, if it has been populated. (It might
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) { // not have been loaded yet since it's filled asynchronously. But it's also filled atomically, so if there are any
for (ServerHolder serverHolder : serverHolders) { // segments at all, we should have all of them.)
ImmutableDruidServer server = serverHolder.getServer(); if (!availableSegments.isEmpty()) {
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();
for (ImmutableDruidDataSource dataSource : server.getDataSources()) { for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
for (DataSegment segment : dataSource.getSegments()) { for (DataSegment segment : dataSource.getSegments()) {
if (!availableSegments.contains(segment)) { if (!availableSegments.contains(segment)) {
LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName()); LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName());
if (!queuePeon.getSegmentsToDrop().contains(segment)) { if (!queuePeon.getSegmentsToDrop().contains(segment)) {
queuePeon.dropSegment( queuePeon.dropSegment(
segment, new LoadPeonCallback() segment, new LoadPeonCallback()
{ {
@Override @Override
public void execute() public void execute()
{ {
} }
}
);
stats.addToTieredStat("unneededCount", server.getTier(), 1);
} }
);
stats.addToTieredStat("unneededCount", server.getTier(), 1);
} }
} }
} }

View File

@ -35,6 +35,7 @@ import org.joda.time.DateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* LoadRules indicate the number of replicants a segment should have in a given tier. * LoadRules indicate the number of replicants a segment should have in a given tier.
@ -48,7 +49,8 @@ public abstract class LoadRule implements Rule
@Override @Override
public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment) public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment)
{ {
CoordinatorStats stats = new CoordinatorStats(); final CoordinatorStats stats = new CoordinatorStats();
final Set<DataSegment> availableSegments = params.getAvailableSegments();
final Map<String, Integer> loadStatus = Maps.newHashMap(); final Map<String, Integer> loadStatus = Maps.newHashMap();
@ -70,7 +72,7 @@ public abstract class LoadRule implements Rule
final List<ServerHolder> serverHolderList = Lists.newArrayList(serverQueue); final List<ServerHolder> serverHolderList = Lists.newArrayList(serverQueue);
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp); final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) { if (availableSegments.contains(segment)) {
CoordinatorStats assignStats = assign( CoordinatorStats assignStats = assign(
params.getReplicationManager(), params.getReplicationManager(),
tier, tier,
@ -167,10 +169,6 @@ public abstract class LoadRule implements Rule
{ {
CoordinatorStats stats = new CoordinatorStats(); CoordinatorStats stats = new CoordinatorStats();
if (!params.hasDeletionWaitTimeElapsed()) {
return stats;
}
// Make sure we have enough loaded replicants in the correct tiers in the cluster before doing anything // Make sure we have enough loaded replicants in the correct tiers in the cluster before doing anything
for (Integer leftToLoad : loadStatus.values()) { for (Integer leftToLoad : loadStatus.values()) {
if (leftToLoad > 0) { if (leftToLoad > 0) {