From 70660ad295d745d41a58de9ee0148813d4e1209c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 6 Aug 2014 14:06:52 -0700 Subject: [PATCH] separate cleanup of overshadowed vs. unavailable segments --- .../server/coordinator/DruidCoordinator.java | 7 +- ... => DruidCoordinatorCleanupAvailable.java} | 45 +-------- .../DruidCoordinatorCleanupOvershadowed.java | 92 +++++++++++++++++++ 3 files changed, 100 insertions(+), 44 deletions(-) rename server/src/main/java/io/druid/server/coordinator/helper/{DruidCoordinatorCleanup.java => DruidCoordinatorCleanupAvailable.java} (66%) create mode 100644 server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 6f099117246..20a452f9abc 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -57,7 +57,8 @@ import io.druid.guice.annotations.Self; import io.druid.segment.IndexIO; import io.druid.server.DruidNode; import io.druid.server.coordinator.helper.DruidCoordinatorBalancer; -import io.druid.server.coordinator.helper.DruidCoordinatorCleanup; +import io.druid.server.coordinator.helper.DruidCoordinatorCleanupAvailable; +import io.druid.server.coordinator.helper.DruidCoordinatorCleanupOvershadowed; import io.druid.server.coordinator.helper.DruidCoordinatorHelper; import io.druid.server.coordinator.helper.DruidCoordinatorLogger; import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; @@ -77,7 +78,6 @@ import org.joda.time.Duration; import java.io.IOException; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -829,7 +829,8 @@ public class DruidCoordinator } }, new DruidCoordinatorRuleRunner(DruidCoordinator.this), - new DruidCoordinatorCleanup(DruidCoordinator.this), + new DruidCoordinatorCleanupAvailable(DruidCoordinator.this), + new DruidCoordinatorCleanupOvershadowed(DruidCoordinator.this), new DruidCoordinatorBalancer(DruidCoordinator.this), new DruidCoordinatorLogger() ), diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanup.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupAvailable.java similarity index 66% rename from server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanup.java rename to server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupAvailable.java index 18c82f183c5..be3f7c3c003 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanup.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupAvailable.java @@ -23,8 +23,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.common.guava.Comparators; import com.metamx.common.logger.Logger; -import io.druid.client.DruidDataSource; -import io.druid.client.DruidServer; import io.druid.client.ImmutableDruidDataSource; import io.druid.client.ImmutableDruidServer; import io.druid.server.coordinator.CoordinatorStats; @@ -43,13 +41,11 @@ import java.util.Set; /** */ -public class DruidCoordinatorCleanup implements DruidCoordinatorHelper +public class DruidCoordinatorCleanupAvailable implements DruidCoordinatorHelper { - private static final Logger log = new Logger(DruidCoordinatorCleanup.class); - private final DruidCoordinator coordinator; - public DruidCoordinatorCleanup( + public DruidCoordinatorCleanupAvailable( DruidCoordinator coordinator ) { @@ -95,43 +91,10 @@ public class DruidCoordinatorCleanup implements DruidCoordinatorHelper } } - // Delete segments that are old - // Unservice old partitions if we've had enough time to make sure we aren't flapping with old data - if (params.hasDeletionWaitTimeElapsed()) { - Map> timelines = Maps.newHashMap(); - - for (MinMaxPriorityQueue serverHolders : cluster.getSortedServersByTier()) { - for (ServerHolder serverHolder : serverHolders) { - ImmutableDruidServer server = serverHolder.getServer(); - - for (ImmutableDruidDataSource dataSource : server.getDataSources()) { - VersionedIntervalTimeline timeline = timelines.get(dataSource.getName()); - if (timeline == null) { - timeline = new VersionedIntervalTimeline(Comparators.comparable()); - timelines.put(dataSource.getName(), timeline); - } - - for (DataSegment segment : dataSource.getSegments()) { - timeline.add( - segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment) - ); - } - } - } - } - - for (VersionedIntervalTimeline timeline : timelines.values()) { - for (TimelineObjectHolder holder : timeline.findOvershadowed()) { - for (DataSegment dataSegment : holder.getObject().payloads()) { - coordinator.removeSegment(dataSegment); - stats.addToGlobalStat("overShadowedCount", 1); - } - } - } - } - return params.buildFromExisting() .withCoordinatorStats(stats) .build(); } + + } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java new file mode 100644 index 00000000000..ec9126ac292 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java @@ -0,0 +1,92 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.coordinator.helper; + +import com.google.common.collect.Maps; +import com.google.common.collect.MinMaxPriorityQueue; +import com.metamx.common.guava.Comparators; +import io.druid.client.ImmutableDruidDataSource; +import io.druid.client.ImmutableDruidServer; +import io.druid.server.coordinator.CoordinatorStats; +import io.druid.server.coordinator.DruidCluster; +import io.druid.server.coordinator.DruidCoordinator; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import io.druid.server.coordinator.ServerHolder; +import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; + +import java.util.Map; + +public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelper +{ + private final DruidCoordinator coordinator; + + public DruidCoordinatorCleanupOvershadowed(DruidCoordinator coordinator) + { + this.coordinator = coordinator; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + CoordinatorStats stats = new CoordinatorStats(); + + // Delete segments that are old + // Unservice old partitions if we've had enough time to make sure we aren't flapping with old data + if (params.hasDeletionWaitTimeElapsed()) { + DruidCluster cluster = params.getDruidCluster(); + Map> timelines = Maps.newHashMap(); + + for (MinMaxPriorityQueue serverHolders : cluster.getSortedServersByTier()) { + for (ServerHolder serverHolder : serverHolders) { + ImmutableDruidServer server = serverHolder.getServer(); + + for (ImmutableDruidDataSource dataSource : server.getDataSources()) { + VersionedIntervalTimeline timeline = timelines.get(dataSource.getName()); + if (timeline == null) { + timeline = new VersionedIntervalTimeline<>(Comparators.comparable()); + timelines.put(dataSource.getName(), timeline); + } + + for (DataSegment segment : dataSource.getSegments()) { + timeline.add( + segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment) + ); + } + } + } + } + + for (VersionedIntervalTimeline timeline : timelines.values()) { + for (TimelineObjectHolder holder : timeline.findOvershadowed()) { + for (DataSegment dataSegment : holder.getObject().payloads()) { + coordinator.removeSegment(dataSegment); + stats.addToGlobalStat("overShadowedCount", 1); + } + } + } + } + + return params.buildFromExisting() + .withCoordinatorStats(stats) + .build(); + } +}