mirror of https://github.com/apache/druid.git
Merge pull request #2346 from himanshug/fix_bug
Fixes bug in kill task scheduling DruidCoordinatorSegmentKiller.findIntervalForKillTask()
This commit is contained in:
commit
ef8e78ca98
|
@ -20,7 +20,6 @@
|
|||
package io.druid.server.coordinator;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -65,6 +64,7 @@ import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
|
|||
import io.druid.server.coordinator.helper.DruidCoordinatorLogger;
|
||||
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
|
||||
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader;
|
||||
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller;
|
||||
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
|
||||
import io.druid.server.coordinator.rules.LoadRule;
|
||||
import io.druid.server.coordinator.rules.Rule;
|
||||
|
@ -79,7 +79,6 @@ import org.joda.time.DateTime;
|
|||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
@ -692,106 +691,6 @@ public class DruidCoordinator
|
|||
return ImmutableList.copyOf(helpers);
|
||||
}
|
||||
|
||||
static class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper
|
||||
{
|
||||
|
||||
private final long period;
|
||||
private final long retainDuration;
|
||||
private final int maxSegmentsToKill;
|
||||
private long lastKillTime = 0;
|
||||
|
||||
|
||||
private final MetadataSegmentManager segmentManager;
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
|
||||
DruidCoordinatorSegmentKiller(
|
||||
MetadataSegmentManager segmentManager,
|
||||
IndexingServiceClient indexingServiceClient,
|
||||
Duration retainDuration,
|
||||
Duration period,
|
||||
int maxSegmentsToKill
|
||||
)
|
||||
{
|
||||
this.period = period.getMillis();
|
||||
Preconditions.checkArgument(this.period > 0, "coordinator kill period must be > 0");
|
||||
|
||||
this.retainDuration = retainDuration.getMillis();
|
||||
Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill retainDuration must be >= 0");
|
||||
|
||||
this.maxSegmentsToKill = maxSegmentsToKill;
|
||||
Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0");
|
||||
|
||||
log.info(
|
||||
"Kill Task scheduling enabled with period [%s], retainDuration [%s], maxSegmentsToKill [%s]",
|
||||
this.period,
|
||||
this.retainDuration,
|
||||
this.maxSegmentsToKill
|
||||
);
|
||||
|
||||
this.segmentManager = segmentManager;
|
||||
this.indexingServiceClient = indexingServiceClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
|
||||
{
|
||||
Set<String> whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist();
|
||||
|
||||
if (whitelist != null && whitelist.size() > 0 && (lastKillTime + period) < System.currentTimeMillis()) {
|
||||
lastKillTime = System.currentTimeMillis();
|
||||
|
||||
for (String dataSource : whitelist) {
|
||||
final Interval intervalToKill = findIntervalForKillTask(dataSource, maxSegmentsToKill);
|
||||
if (intervalToKill != null) {
|
||||
try {
|
||||
indexingServiceClient.killSegments(dataSource, intervalToKill);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource);
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
log.warn("skipping kill task scheduling because thread is interrupted.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
private Interval findIntervalForKillTask(String dataSource, int limit)
|
||||
{
|
||||
List<Interval> unusedSegmentIntervals = segmentManager.getUnusedSegmentIntervals(
|
||||
dataSource,
|
||||
new Interval(
|
||||
0,
|
||||
System.currentTimeMillis()
|
||||
- retainDuration
|
||||
),
|
||||
limit
|
||||
);
|
||||
|
||||
if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) {
|
||||
long start = Long.MIN_VALUE;
|
||||
long end = Long.MAX_VALUE;
|
||||
|
||||
for (Interval interval : unusedSegmentIntervals) {
|
||||
if (start < interval.getStartMillis()) {
|
||||
start = interval.getStartMillis();
|
||||
}
|
||||
|
||||
if (end > interval.getEndMillis()) {
|
||||
end = interval.getEndMillis();
|
||||
}
|
||||
}
|
||||
|
||||
return new Interval(start, end);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class DruidCoordinatorVersionConverter implements DruidCoordinatorHelper
|
||||
{
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
|
|
|
@ -25,5 +25,12 @@ import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
|||
*/
|
||||
public interface DruidCoordinatorHelper
|
||||
{
|
||||
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params);
|
||||
/**
|
||||
* Implementations of this method run various activities performed by the coordinator.
|
||||
* Input params can be used and modified. They are typically in a list and returned
|
||||
* DruidCoordinatorRuntimeParams is passed to the next helper.
|
||||
* @param params
|
||||
* @return same as input or a modified value to be used by next helper.
|
||||
*/
|
||||
DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.server.coordinator.helper;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.client.indexing.IndexingServiceClient;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.metadata.MetadataSegmentManager;
|
||||
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper
|
||||
{
|
||||
private final static Logger log = new Logger(DruidCoordinatorSegmentKiller.class);
|
||||
|
||||
private final long period;
|
||||
private final long retainDuration;
|
||||
private final int maxSegmentsToKill;
|
||||
private long lastKillTime = 0;
|
||||
|
||||
|
||||
private final MetadataSegmentManager segmentManager;
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
|
||||
public DruidCoordinatorSegmentKiller(
|
||||
MetadataSegmentManager segmentManager,
|
||||
IndexingServiceClient indexingServiceClient,
|
||||
Duration retainDuration,
|
||||
Duration period,
|
||||
int maxSegmentsToKill
|
||||
)
|
||||
{
|
||||
this.period = period.getMillis();
|
||||
Preconditions.checkArgument(this.period > 0, "coordinator kill period must be > 0");
|
||||
|
||||
this.retainDuration = retainDuration.getMillis();
|
||||
Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill retainDuration must be >= 0");
|
||||
|
||||
this.maxSegmentsToKill = maxSegmentsToKill;
|
||||
Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0");
|
||||
|
||||
log.info(
|
||||
"Kill Task scheduling enabled with period [%s], retainDuration [%s], maxSegmentsToKill [%s]",
|
||||
this.period,
|
||||
this.retainDuration,
|
||||
this.maxSegmentsToKill
|
||||
);
|
||||
|
||||
this.segmentManager = segmentManager;
|
||||
this.indexingServiceClient = indexingServiceClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
|
||||
{
|
||||
Set<String> whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist();
|
||||
|
||||
if (whitelist != null && whitelist.size() > 0 && (lastKillTime + period) < System.currentTimeMillis()) {
|
||||
lastKillTime = System.currentTimeMillis();
|
||||
|
||||
for (String dataSource : whitelist) {
|
||||
final Interval intervalToKill = findIntervalForKillTask(dataSource, maxSegmentsToKill);
|
||||
if (intervalToKill != null) {
|
||||
try {
|
||||
indexingServiceClient.killSegments(dataSource, intervalToKill);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource);
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
log.warn("skipping kill task scheduling because thread is interrupted.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Interval findIntervalForKillTask(String dataSource, int limit)
|
||||
{
|
||||
List<Interval> unusedSegmentIntervals = segmentManager.getUnusedSegmentIntervals(
|
||||
dataSource,
|
||||
new Interval(
|
||||
0,
|
||||
System.currentTimeMillis()
|
||||
- retainDuration
|
||||
),
|
||||
limit
|
||||
);
|
||||
|
||||
if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) {
|
||||
return JodaUtils.umbrellaInterval(unusedSegmentIntervals);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.server.coordinator.helper;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.client.indexing.IndexingServiceClient;
|
||||
import io.druid.metadata.MetadataSegmentManager;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DruidCoordinatorSegmentKillerTest
|
||||
{
|
||||
@Test
|
||||
public void testFindIntervalForKillTask()
|
||||
{
|
||||
testFindIntervalForKillTask(null, null);
|
||||
testFindIntervalForKillTask(ImmutableList.<Interval>of(), null);
|
||||
|
||||
testFindIntervalForKillTask(ImmutableList.<Interval>of(Interval.parse("2014/2015")), Interval.parse("2014/2015"));
|
||||
|
||||
testFindIntervalForKillTask(
|
||||
ImmutableList.<Interval>of(Interval.parse("2014/2015"), Interval.parse("2016/2017")),
|
||||
Interval.parse("2014/2017")
|
||||
);
|
||||
|
||||
testFindIntervalForKillTask(
|
||||
ImmutableList.<Interval>of(Interval.parse("2014/2015"), Interval.parse("2015/2016")),
|
||||
Interval.parse("2014/2016")
|
||||
);
|
||||
|
||||
testFindIntervalForKillTask(
|
||||
ImmutableList.<Interval>of(Interval.parse("2015/2016"), Interval.parse("2014/2015")),
|
||||
Interval.parse("2014/2016")
|
||||
);
|
||||
|
||||
testFindIntervalForKillTask(
|
||||
ImmutableList.<Interval>of(Interval.parse("2015/2017"), Interval.parse("2014/2016")),
|
||||
Interval.parse("2014/2017")
|
||||
);
|
||||
|
||||
testFindIntervalForKillTask(
|
||||
ImmutableList.<Interval>of(
|
||||
Interval.parse("2015/2019"),
|
||||
Interval.parse("2014/2016"),
|
||||
Interval.parse("2018/2020")
|
||||
),
|
||||
Interval.parse("2014/2020")
|
||||
);
|
||||
|
||||
testFindIntervalForKillTask(
|
||||
ImmutableList.<Interval>of(
|
||||
Interval.parse("2015/2019"),
|
||||
Interval.parse("2014/2016"),
|
||||
Interval.parse("2018/2020"),
|
||||
Interval.parse("2021/2022")
|
||||
),
|
||||
Interval.parse("2014/2022")
|
||||
);
|
||||
}
|
||||
|
||||
private void testFindIntervalForKillTask(List<Interval> segmentManagerResult, Interval expected)
|
||||
{
|
||||
MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
|
||||
EasyMock.expect(
|
||||
segmentManager.getUnusedSegmentIntervals(
|
||||
EasyMock.anyString(),
|
||||
EasyMock.anyObject(Interval.class),
|
||||
EasyMock.anyInt()
|
||||
)
|
||||
).andReturn(
|
||||
segmentManagerResult
|
||||
);
|
||||
EasyMock.replay(segmentManager);
|
||||
IndexingServiceClient indexingServiceClient = EasyMock.createMock(IndexingServiceClient.class);
|
||||
|
||||
DruidCoordinatorSegmentKiller coordinatorSegmentKiller = new DruidCoordinatorSegmentKiller(
|
||||
segmentManager,
|
||||
indexingServiceClient,
|
||||
Duration.parse("PT86400S"),
|
||||
Duration.parse("PT86400S"),
|
||||
1000
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
expected,
|
||||
coordinatorSegmentKiller.findIntervalForKillTask("test", 10000)
|
||||
);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue