mirror of https://github.com/apache/druid.git
moving DruidCoordinatorSegmentKiller class out of DruidCoordinator
This commit is contained in:
parent
99017f4518
commit
ab3edfa8fc
|
@ -20,7 +20,6 @@
|
|||
package io.druid.server.coordinator;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -65,6 +64,7 @@ import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
|
|||
import io.druid.server.coordinator.helper.DruidCoordinatorLogger;
|
||||
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
|
||||
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader;
|
||||
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller;
|
||||
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
|
||||
import io.druid.server.coordinator.rules.LoadRule;
|
||||
import io.druid.server.coordinator.rules.Rule;
|
||||
|
@ -79,7 +79,6 @@ import org.joda.time.DateTime;
|
|||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
@ -692,106 +691,6 @@ public class DruidCoordinator
|
|||
return ImmutableList.copyOf(helpers);
|
||||
}
|
||||
|
||||
static class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper
|
||||
{
|
||||
|
||||
private final long period;
|
||||
private final long retainDuration;
|
||||
private final int maxSegmentsToKill;
|
||||
private long lastKillTime = 0;
|
||||
|
||||
|
||||
private final MetadataSegmentManager segmentManager;
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
|
||||
DruidCoordinatorSegmentKiller(
|
||||
MetadataSegmentManager segmentManager,
|
||||
IndexingServiceClient indexingServiceClient,
|
||||
Duration retainDuration,
|
||||
Duration period,
|
||||
int maxSegmentsToKill
|
||||
)
|
||||
{
|
||||
this.period = period.getMillis();
|
||||
Preconditions.checkArgument(this.period > 0, "coordinator kill period must be > 0");
|
||||
|
||||
this.retainDuration = retainDuration.getMillis();
|
||||
Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill retainDuration must be >= 0");
|
||||
|
||||
this.maxSegmentsToKill = maxSegmentsToKill;
|
||||
Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0");
|
||||
|
||||
log.info(
|
||||
"Kill Task scheduling enabled with period [%s], retainDuration [%s], maxSegmentsToKill [%s]",
|
||||
this.period,
|
||||
this.retainDuration,
|
||||
this.maxSegmentsToKill
|
||||
);
|
||||
|
||||
this.segmentManager = segmentManager;
|
||||
this.indexingServiceClient = indexingServiceClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
|
||||
{
|
||||
Set<String> whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist();
|
||||
|
||||
if (whitelist != null && whitelist.size() > 0 && (lastKillTime + period) < System.currentTimeMillis()) {
|
||||
lastKillTime = System.currentTimeMillis();
|
||||
|
||||
for (String dataSource : whitelist) {
|
||||
final Interval intervalToKill = findIntervalForKillTask(dataSource, maxSegmentsToKill);
|
||||
if (intervalToKill != null) {
|
||||
try {
|
||||
indexingServiceClient.killSegments(dataSource, intervalToKill);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource);
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
log.warn("skipping kill task scheduling because thread is interrupted.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
private Interval findIntervalForKillTask(String dataSource, int limit)
|
||||
{
|
||||
List<Interval> unusedSegmentIntervals = segmentManager.getUnusedSegmentIntervals(
|
||||
dataSource,
|
||||
new Interval(
|
||||
0,
|
||||
System.currentTimeMillis()
|
||||
- retainDuration
|
||||
),
|
||||
limit
|
||||
);
|
||||
|
||||
if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) {
|
||||
long start = Long.MIN_VALUE;
|
||||
long end = Long.MAX_VALUE;
|
||||
|
||||
for (Interval interval : unusedSegmentIntervals) {
|
||||
if (start < interval.getStartMillis()) {
|
||||
start = interval.getStartMillis();
|
||||
}
|
||||
|
||||
if (end > interval.getEndMillis()) {
|
||||
end = interval.getEndMillis();
|
||||
}
|
||||
}
|
||||
|
||||
return new Interval(start, end);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class DruidCoordinatorVersionConverter implements DruidCoordinatorHelper
|
||||
{
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.server.coordinator.helper;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.client.indexing.IndexingServiceClient;
|
||||
import io.druid.metadata.MetadataSegmentManager;
|
||||
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper
|
||||
{
|
||||
private final static Logger log = new Logger(DruidCoordinatorSegmentKiller.class);
|
||||
|
||||
private final long period;
|
||||
private final long retainDuration;
|
||||
private final int maxSegmentsToKill;
|
||||
private long lastKillTime = 0;
|
||||
|
||||
|
||||
private final MetadataSegmentManager segmentManager;
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
|
||||
public DruidCoordinatorSegmentKiller(
|
||||
MetadataSegmentManager segmentManager,
|
||||
IndexingServiceClient indexingServiceClient,
|
||||
Duration retainDuration,
|
||||
Duration period,
|
||||
int maxSegmentsToKill
|
||||
)
|
||||
{
|
||||
this.period = period.getMillis();
|
||||
Preconditions.checkArgument(this.period > 0, "coordinator kill period must be > 0");
|
||||
|
||||
this.retainDuration = retainDuration.getMillis();
|
||||
Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill retainDuration must be >= 0");
|
||||
|
||||
this.maxSegmentsToKill = maxSegmentsToKill;
|
||||
Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0");
|
||||
|
||||
log.info(
|
||||
"Kill Task scheduling enabled with period [%s], retainDuration [%s], maxSegmentsToKill [%s]",
|
||||
this.period,
|
||||
this.retainDuration,
|
||||
this.maxSegmentsToKill
|
||||
);
|
||||
|
||||
this.segmentManager = segmentManager;
|
||||
this.indexingServiceClient = indexingServiceClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
|
||||
{
|
||||
Set<String> whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist();
|
||||
|
||||
if (whitelist != null && whitelist.size() > 0 && (lastKillTime + period) < System.currentTimeMillis()) {
|
||||
lastKillTime = System.currentTimeMillis();
|
||||
|
||||
for (String dataSource : whitelist) {
|
||||
final Interval intervalToKill = findIntervalForKillTask(dataSource, maxSegmentsToKill);
|
||||
if (intervalToKill != null) {
|
||||
try {
|
||||
indexingServiceClient.killSegments(dataSource, intervalToKill);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource);
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
log.warn("skipping kill task scheduling because thread is interrupted.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
private Interval findIntervalForKillTask(String dataSource, int limit)
|
||||
{
|
||||
List<Interval> unusedSegmentIntervals = segmentManager.getUnusedSegmentIntervals(
|
||||
dataSource,
|
||||
new Interval(
|
||||
0,
|
||||
System.currentTimeMillis()
|
||||
- retainDuration
|
||||
),
|
||||
limit
|
||||
);
|
||||
|
||||
if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) {
|
||||
long start = Long.MIN_VALUE;
|
||||
long end = Long.MAX_VALUE;
|
||||
|
||||
for (Interval interval : unusedSegmentIntervals) {
|
||||
if (start < interval.getStartMillis()) {
|
||||
start = interval.getStartMillis();
|
||||
}
|
||||
|
||||
if (end > interval.getEndMillis()) {
|
||||
end = interval.getEndMillis();
|
||||
}
|
||||
}
|
||||
|
||||
return new Interval(start, end);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue