diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index e50280c2e9d..748a8522a6b 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -20,7 +20,6 @@ package io.druid.server.coordinator; import com.google.common.base.Function; -import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -65,6 +64,7 @@ import io.druid.server.coordinator.helper.DruidCoordinatorHelper; import io.druid.server.coordinator.helper.DruidCoordinatorLogger; import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader; +import io.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; import io.druid.server.coordinator.rules.LoadRule; import io.druid.server.coordinator.rules.Rule; @@ -79,7 +79,6 @@ import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; @@ -692,106 +691,6 @@ public class DruidCoordinator return ImmutableList.copyOf(helpers); } - static class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper - { - - private final long period; - private final long retainDuration; - private final int maxSegmentsToKill; - private long lastKillTime = 0; - - - private final MetadataSegmentManager segmentManager; - private final IndexingServiceClient indexingServiceClient; - - DruidCoordinatorSegmentKiller( - MetadataSegmentManager segmentManager, - IndexingServiceClient indexingServiceClient, - Duration retainDuration, - Duration period, - int maxSegmentsToKill - ) - { - this.period = period.getMillis(); - Preconditions.checkArgument(this.period > 0, "coordinator kill period must be > 0"); - - this.retainDuration = retainDuration.getMillis(); - Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill retainDuration must be >= 0"); - - this.maxSegmentsToKill = maxSegmentsToKill; - Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0"); - - log.info( - "Kill Task scheduling enabled with period [%s], retainDuration [%s], maxSegmentsToKill [%s]", - this.period, - this.retainDuration, - this.maxSegmentsToKill - ); - - this.segmentManager = segmentManager; - this.indexingServiceClient = indexingServiceClient; - } - - @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) - { - Set whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist(); - - if (whitelist != null && whitelist.size() > 0 && (lastKillTime + period) < System.currentTimeMillis()) { - lastKillTime = System.currentTimeMillis(); - - for (String dataSource : whitelist) { - final Interval intervalToKill = findIntervalForKillTask(dataSource, maxSegmentsToKill); - if (intervalToKill != null) { - try { - indexingServiceClient.killSegments(dataSource, intervalToKill); - } - catch (Exception ex) { - log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource); - if (Thread.currentThread().isInterrupted()) { - log.warn("skipping kill task scheduling because thread is interrupted."); - break; - } - } - } - } - } - return params; - } - - private Interval findIntervalForKillTask(String dataSource, int limit) - { - List unusedSegmentIntervals = segmentManager.getUnusedSegmentIntervals( - dataSource, - new Interval( - 0, - System.currentTimeMillis() - - retainDuration - ), - limit - ); - - if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) { - long start = Long.MIN_VALUE; - long end = Long.MAX_VALUE; - - for (Interval interval : unusedSegmentIntervals) { - if (start < interval.getStartMillis()) { - start = interval.getStartMillis(); - } - - if (end > interval.getEndMillis()) { - end = interval.getEndMillis(); - } - } - - return new Interval(start, end); - } else { - return null; - } - } - } - public static class DruidCoordinatorVersionConverter implements DruidCoordinatorHelper { private final IndexingServiceClient indexingServiceClient; diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java new file mode 100644 index 00000000000..2a3ff15e4b2 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java @@ -0,0 +1,134 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.helper; + +import com.google.common.base.Preconditions; +import com.metamx.common.logger.Logger; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.metadata.MetadataSegmentManager; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.joda.time.Duration; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Set; + +/** + */ +public class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper +{ + private final static Logger log = new Logger(DruidCoordinatorSegmentKiller.class); + + private final long period; + private final long retainDuration; + private final int maxSegmentsToKill; + private long lastKillTime = 0; + + + private final MetadataSegmentManager segmentManager; + private final IndexingServiceClient indexingServiceClient; + + public DruidCoordinatorSegmentKiller( + MetadataSegmentManager segmentManager, + IndexingServiceClient indexingServiceClient, + Duration retainDuration, + Duration period, + int maxSegmentsToKill + ) + { + this.period = period.getMillis(); + Preconditions.checkArgument(this.period > 0, "coordinator kill period must be > 0"); + + this.retainDuration = retainDuration.getMillis(); + Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill retainDuration must be >= 0"); + + this.maxSegmentsToKill = maxSegmentsToKill; + Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0"); + + log.info( + "Kill Task scheduling enabled with period [%s], retainDuration [%s], maxSegmentsToKill [%s]", + this.period, + this.retainDuration, + this.maxSegmentsToKill + ); + + this.segmentManager = segmentManager; + this.indexingServiceClient = indexingServiceClient; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + Set whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist(); + + if (whitelist != null && whitelist.size() > 0 && (lastKillTime + period) < System.currentTimeMillis()) { + lastKillTime = System.currentTimeMillis(); + + for (String dataSource : whitelist) { + final Interval intervalToKill = findIntervalForKillTask(dataSource, maxSegmentsToKill); + if (intervalToKill != null) { + try { + indexingServiceClient.killSegments(dataSource, intervalToKill); + } + catch (Exception ex) { + log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource); + if (Thread.currentThread().isInterrupted()) { + log.warn("skipping kill task scheduling because thread is interrupted."); + break; + } + } + } + } + } + return params; + } + + private Interval findIntervalForKillTask(String dataSource, int limit) + { + List unusedSegmentIntervals = segmentManager.getUnusedSegmentIntervals( + dataSource, + new Interval( + 0, + System.currentTimeMillis() + - retainDuration + ), + limit + ); + + if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) { + long start = Long.MIN_VALUE; + long end = Long.MAX_VALUE; + + for (Interval interval : unusedSegmentIntervals) { + if (start < interval.getStartMillis()) { + start = interval.getStartMillis(); + } + + if (end > interval.getEndMillis()) { + end = interval.getEndMillis(); + } + } + + return new Interval(start, end); + } else { + return null; + } + } +}