From ab3edfa8fc500dc0eb8683f576880fa80cd53e20 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 27 Jan 2016 10:27:24 -0600 Subject: [PATCH 1/2] moving DruidCoordinatorSegmentKiller class out of DruidCoordinator --- .../server/coordinator/DruidCoordinator.java | 103 +------------- .../helper/DruidCoordinatorSegmentKiller.java | 134 ++++++++++++++++++ 2 files changed, 135 insertions(+), 102 deletions(-) create mode 100644 server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index e50280c2e9d..748a8522a6b 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -20,7 +20,6 @@ package io.druid.server.coordinator; import com.google.common.base.Function; -import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -65,6 +64,7 @@ import io.druid.server.coordinator.helper.DruidCoordinatorHelper; import io.druid.server.coordinator.helper.DruidCoordinatorLogger; import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader; +import io.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; import io.druid.server.coordinator.rules.LoadRule; import io.druid.server.coordinator.rules.Rule; @@ -79,7 +79,6 @@ import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; @@ -692,106 +691,6 @@ public class DruidCoordinator return ImmutableList.copyOf(helpers); } - static class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper - { - - private final long period; - private final long retainDuration; - private final int maxSegmentsToKill; - private long lastKillTime = 0; - - - private final MetadataSegmentManager segmentManager; - private final IndexingServiceClient indexingServiceClient; - - DruidCoordinatorSegmentKiller( - MetadataSegmentManager segmentManager, - IndexingServiceClient indexingServiceClient, - Duration retainDuration, - Duration period, - int maxSegmentsToKill - ) - { - this.period = period.getMillis(); - Preconditions.checkArgument(this.period > 0, "coordinator kill period must be > 0"); - - this.retainDuration = retainDuration.getMillis(); - Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill retainDuration must be >= 0"); - - this.maxSegmentsToKill = maxSegmentsToKill; - Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0"); - - log.info( - "Kill Task scheduling enabled with period [%s], retainDuration [%s], maxSegmentsToKill [%s]", - this.period, - this.retainDuration, - this.maxSegmentsToKill - ); - - this.segmentManager = segmentManager; - this.indexingServiceClient = indexingServiceClient; - } - - @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) - { - Set whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist(); - - if (whitelist != null && whitelist.size() > 0 && (lastKillTime + period) < System.currentTimeMillis()) { - lastKillTime = System.currentTimeMillis(); - - for (String dataSource : whitelist) { - final Interval intervalToKill = findIntervalForKillTask(dataSource, maxSegmentsToKill); - if (intervalToKill != null) { - try { - indexingServiceClient.killSegments(dataSource, intervalToKill); - } - catch (Exception ex) { - log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource); - if (Thread.currentThread().isInterrupted()) { - log.warn("skipping kill task scheduling because thread is interrupted."); - break; - } - } - } - } - } - return params; - } - - private Interval findIntervalForKillTask(String dataSource, int limit) - { - List unusedSegmentIntervals = segmentManager.getUnusedSegmentIntervals( - dataSource, - new Interval( - 0, - System.currentTimeMillis() - - retainDuration - ), - limit - ); - - if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) { - long start = Long.MIN_VALUE; - long end = Long.MAX_VALUE; - - for (Interval interval : unusedSegmentIntervals) { - if (start < interval.getStartMillis()) { - start = interval.getStartMillis(); - } - - if (end > interval.getEndMillis()) { - end = interval.getEndMillis(); - } - } - - return new Interval(start, end); - } else { - return null; - } - } - } - public static class DruidCoordinatorVersionConverter implements DruidCoordinatorHelper { private final IndexingServiceClient indexingServiceClient; diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java new file mode 100644 index 00000000000..2a3ff15e4b2 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java @@ -0,0 +1,134 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.helper; + +import com.google.common.base.Preconditions; +import com.metamx.common.logger.Logger; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.metadata.MetadataSegmentManager; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.joda.time.Duration; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Set; + +/** + */ +public class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper +{ + private final static Logger log = new Logger(DruidCoordinatorSegmentKiller.class); + + private final long period; + private final long retainDuration; + private final int maxSegmentsToKill; + private long lastKillTime = 0; + + + private final MetadataSegmentManager segmentManager; + private final IndexingServiceClient indexingServiceClient; + + public DruidCoordinatorSegmentKiller( + MetadataSegmentManager segmentManager, + IndexingServiceClient indexingServiceClient, + Duration retainDuration, + Duration period, + int maxSegmentsToKill + ) + { + this.period = period.getMillis(); + Preconditions.checkArgument(this.period > 0, "coordinator kill period must be > 0"); + + this.retainDuration = retainDuration.getMillis(); + Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill retainDuration must be >= 0"); + + this.maxSegmentsToKill = maxSegmentsToKill; + Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0"); + + log.info( + "Kill Task scheduling enabled with period [%s], retainDuration [%s], maxSegmentsToKill [%s]", + this.period, + this.retainDuration, + this.maxSegmentsToKill + ); + + this.segmentManager = segmentManager; + this.indexingServiceClient = indexingServiceClient; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + Set whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist(); + + if (whitelist != null && whitelist.size() > 0 && (lastKillTime + period) < System.currentTimeMillis()) { + lastKillTime = System.currentTimeMillis(); + + for (String dataSource : whitelist) { + final Interval intervalToKill = findIntervalForKillTask(dataSource, maxSegmentsToKill); + if (intervalToKill != null) { + try { + indexingServiceClient.killSegments(dataSource, intervalToKill); + } + catch (Exception ex) { + log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource); + if (Thread.currentThread().isInterrupted()) { + log.warn("skipping kill task scheduling because thread is interrupted."); + break; + } + } + } + } + } + return params; + } + + private Interval findIntervalForKillTask(String dataSource, int limit) + { + List unusedSegmentIntervals = segmentManager.getUnusedSegmentIntervals( + dataSource, + new Interval( + 0, + System.currentTimeMillis() + - retainDuration + ), + limit + ); + + if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) { + long start = Long.MIN_VALUE; + long end = Long.MAX_VALUE; + + for (Interval interval : unusedSegmentIntervals) { + if (start < interval.getStartMillis()) { + start = interval.getStartMillis(); + } + + if (end > interval.getEndMillis()) { + end = interval.getEndMillis(); + } + } + + return new Interval(start, end); + } else { + return null; + } + } +} From f6b4dbd6975d3378266a07da07d00abfbcbf15c9 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 27 Jan 2016 12:43:23 -0600 Subject: [PATCH 2/2] bug fix and unit tests for DruidCoordinatorSegmentKiller --- .../helper/DruidCoordinatorHelper.java | 9 +- .../helper/DruidCoordinatorSegmentKiller.java | 20 +--- .../DruidCoordinatorSegmentKillerTest.java | 113 ++++++++++++++++++ 3 files changed, 126 insertions(+), 16 deletions(-) create mode 100644 server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorHelper.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorHelper.java index 3c9225a5ead..852334b2a69 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorHelper.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorHelper.java @@ -25,5 +25,12 @@ import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; */ public interface DruidCoordinatorHelper { - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params); + /** + * Implementations of this method run various activities performed by the coordinator. + * Input params can be used and modified. They are typically in a list and returned + * DruidCoordinatorRuntimeParams is passed to the next helper. + * @param params + * @return same as input or a modified value to be used by next helper. + */ + DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params); } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java index 2a3ff15e4b2..aa9333552fc 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKiller.java @@ -19,9 +19,11 @@ package io.druid.server.coordinator.helper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.metamx.common.logger.Logger; import io.druid.client.indexing.IndexingServiceClient; +import io.druid.common.utils.JodaUtils; import io.druid.metadata.MetadataSegmentManager; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.joda.time.Duration; @@ -100,7 +102,8 @@ public class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper return params; } - private Interval findIntervalForKillTask(String dataSource, int limit) + @VisibleForTesting + Interval findIntervalForKillTask(String dataSource, int limit) { List unusedSegmentIntervals = segmentManager.getUnusedSegmentIntervals( dataSource, @@ -113,20 +116,7 @@ public class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper ); if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) { - long start = Long.MIN_VALUE; - long end = Long.MAX_VALUE; - - for (Interval interval : unusedSegmentIntervals) { - if (start < interval.getStartMillis()) { - start = interval.getStartMillis(); - } - - if (end > interval.getEndMillis()) { - end = interval.getEndMillis(); - } - } - - return new Interval(start, end); + return JodaUtils.umbrellaInterval(unusedSegmentIntervals); } else { return null; } diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java new file mode 100644 index 00000000000..819446dcaea --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.helper; + +import com.google.common.collect.ImmutableList; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.metadata.MetadataSegmentManager; +import org.easymock.EasyMock; +import org.joda.time.Duration; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class DruidCoordinatorSegmentKillerTest +{ + @Test + public void testFindIntervalForKillTask() + { + testFindIntervalForKillTask(null, null); + testFindIntervalForKillTask(ImmutableList.of(), null); + + testFindIntervalForKillTask(ImmutableList.of(Interval.parse("2014/2015")), Interval.parse("2014/2015")); + + testFindIntervalForKillTask( + ImmutableList.of(Interval.parse("2014/2015"), Interval.parse("2016/2017")), + Interval.parse("2014/2017") + ); + + testFindIntervalForKillTask( + ImmutableList.of(Interval.parse("2014/2015"), Interval.parse("2015/2016")), + Interval.parse("2014/2016") + ); + + testFindIntervalForKillTask( + ImmutableList.of(Interval.parse("2015/2016"), Interval.parse("2014/2015")), + Interval.parse("2014/2016") + ); + + testFindIntervalForKillTask( + ImmutableList.of(Interval.parse("2015/2017"), Interval.parse("2014/2016")), + Interval.parse("2014/2017") + ); + + testFindIntervalForKillTask( + ImmutableList.of( + Interval.parse("2015/2019"), + Interval.parse("2014/2016"), + Interval.parse("2018/2020") + ), + Interval.parse("2014/2020") + ); + + testFindIntervalForKillTask( + ImmutableList.of( + Interval.parse("2015/2019"), + Interval.parse("2014/2016"), + Interval.parse("2018/2020"), + Interval.parse("2021/2022") + ), + Interval.parse("2014/2022") + ); + } + + private void testFindIntervalForKillTask(List segmentManagerResult, Interval expected) + { + MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class); + EasyMock.expect( + segmentManager.getUnusedSegmentIntervals( + EasyMock.anyString(), + EasyMock.anyObject(Interval.class), + EasyMock.anyInt() + ) + ).andReturn( + segmentManagerResult + ); + EasyMock.replay(segmentManager); + IndexingServiceClient indexingServiceClient = EasyMock.createMock(IndexingServiceClient.class); + + DruidCoordinatorSegmentKiller coordinatorSegmentKiller = new DruidCoordinatorSegmentKiller( + segmentManager, + indexingServiceClient, + Duration.parse("PT86400S"), + Duration.parse("PT86400S"), + 1000 + ); + + Assert.assertEquals( + expected, + coordinatorSegmentKiller.findIntervalForKillTask("test", 10000) + ); + } +}