From b313601a3b603527dbe43f62226de18d1f1c2229 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 19 Jun 2014 16:32:38 -0700 Subject: [PATCH 1/4] add an optional delay for how long it takes to drop a segment, fixes all manners of inconsistent issues --- .../druid/client/CachingClusteredClient.java | 2 +- .../segment/loading/SegmentLoaderConfig.java | 15 +++++++-- .../server/coordination/ZkCoordinator.java | 32 ++++++++++++++++--- .../coordination/ZkCoordinatorTest.java | 5 ++- 4 files changed, 45 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index cf5f09228f6..fe38811f332 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -228,7 +228,7 @@ public class CachingClusteredClient implements QueryRunner final QueryableDruidServer queryableDruidServer = segment.lhs.pick(); if (queryableDruidServer == null) { - log.error("No servers found for %s?! How can this be?!", segment.rhs); + log.makeAlert("No servers found for %s?! How can this be?!", segment.rhs).emit(); } else { final DruidServer server = queryableDruidServer.getServer(); List descriptors = serverSegments.get(server); diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index a1339790a20..78fc18fb92d 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -37,6 +37,9 @@ public class SegmentLoaderConfig @JsonProperty("deleteOnRemove") private boolean deleteOnRemove = true; + @JsonProperty("dropSegmentDelayMillis") + private int dropSegmentDelayMillis = 0; + @JsonProperty private File infoDir = null; @@ -50,6 +53,11 @@ public class SegmentLoaderConfig return deleteOnRemove; } + public int getDropSegmentDelayMillis() + { + return dropSegmentDelayMillis; + } + public File getInfoDir() { if (infoDir == null) { @@ -72,9 +80,10 @@ public class SegmentLoaderConfig public String toString() { return "SegmentLoaderConfig{" + - "locations=" + getLocations() + - ", deleteOnRemove=" + isDeleteOnRemove() + - ", infoDir=" + getInfoDir() + + "locations=" + locations + + ", deleteOnRemove=" + deleteOnRemove + + ", dropSegmentDelayMillis=" + dropSegmentDelayMillis + + ", infoDir=" + infoDir + '}'; } } diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 28edb8ddca2..b2ab3ec1151 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -20,8 +20,10 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.inject.Inject; +import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.emitter.EmittingLogger; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoadingException; @@ -32,6 +34,8 @@ import org.apache.curator.framework.CuratorFramework; import java.io.File; import java.io.IOException; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** */ @@ -43,6 +47,7 @@ public class ZkCoordinator extends BaseZkCoordinator private final SegmentLoaderConfig config; private final DataSegmentAnnouncer announcer; private final ServerManager serverManager; + private final ScheduledExecutorService exec; @Inject public ZkCoordinator( @@ -52,7 +57,8 @@ public class ZkCoordinator extends BaseZkCoordinator DruidServerMetadata me, DataSegmentAnnouncer announcer, CuratorFramework curator, - ServerManager serverManager + ServerManager serverManager, + ScheduledExecutorFactory factory ) { super(jsonMapper, zkPaths, me, curator); @@ -61,6 +67,8 @@ public class ZkCoordinator extends BaseZkCoordinator this.config = config; this.announcer = announcer; this.serverManager = serverManager; + + this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); } @Override @@ -225,17 +233,33 @@ public class ZkCoordinator extends BaseZkCoordinator @Override - public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) + public void removeSegment(final DataSegment segment, DataSegmentChangeCallback callback) { try { - serverManager.dropSegment(segment); - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (!segmentInfoCacheFile.delete()) { log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); } announcer.unannounceSegment(segment); + + exec.schedule( + new Runnable() + { + @Override + public void run() + { + try { + serverManager.dropSegment(segment); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + }, + config.getDropSegmentDelayMillis(), + TimeUnit.MILLISECONDS + ); } catch (Exception e) { log.makeAlert(e, "Failed to remove segment") diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index d0a7b62f4be..f50ae3e16fc 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.LocalCacheProvider; @@ -117,7 +119,8 @@ public class ZkCoordinatorTest extends CuratorTestBase me, announcer, curator, - serverManager + serverManager, + ScheduledExecutors.createFactory(new Lifecycle()) ); } From 2aab2a0cd9a2afad78f10cd0c1740629e589c9a5 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 19 Jun 2014 16:48:43 -0700 Subject: [PATCH 2/4] address cr --- .../java/io/druid/segment/loading/SegmentLoaderConfig.java | 2 +- .../java/io/druid/server/coordination/ZkCoordinator.java | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index 78fc18fb92d..cefb275e946 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -38,7 +38,7 @@ public class SegmentLoaderConfig private boolean deleteOnRemove = true; @JsonProperty("dropSegmentDelayMillis") - private int dropSegmentDelayMillis = 0; + private int dropSegmentDelayMillis = 5 * 60 * 1000; // 5 mins @JsonProperty private File infoDir = null; diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index b2ab3ec1151..f2d3a2fd854 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -233,7 +233,7 @@ public class ZkCoordinator extends BaseZkCoordinator @Override - public void removeSegment(final DataSegment segment, DataSegmentChangeCallback callback) + public void removeSegment(final DataSegment segment, final DataSegmentChangeCallback callback) { try { File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); @@ -243,6 +243,7 @@ public class ZkCoordinator extends BaseZkCoordinator announcer.unannounceSegment(segment); + log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis()); exec.schedule( new Runnable() { @@ -253,6 +254,9 @@ public class ZkCoordinator extends BaseZkCoordinator serverManager.dropSegment(segment); } catch (Exception e) { + log.makeAlert(e, "Failed to remove segment! Possible resource leak!") + .addData("segment", segment) + .emit(); throw Throwables.propagate(e); } } From fc36bfbc6db5b2e6a42dd43a585e9505871277a5 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 19 Jun 2014 16:49:50 -0700 Subject: [PATCH 3/4] remove pointless exception --- .../main/java/io/druid/server/coordination/ZkCoordinator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index f2d3a2fd854..7a91d542b96 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -257,7 +257,6 @@ public class ZkCoordinator extends BaseZkCoordinator log.makeAlert(e, "Failed to remove segment! Possible resource leak!") .addData("segment", segment) .emit(); - throw Throwables.propagate(e); } } }, From 5afcb9cbd3a3efc287f430f6c405ce024427da8d Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 19 Jun 2014 16:55:02 -0700 Subject: [PATCH 4/4] add docs --- docs/content/Configuration.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/content/Configuration.md b/docs/content/Configuration.md index 35269241c72..264efb91ecd 100644 --- a/docs/content/Configuration.md +++ b/docs/content/Configuration.md @@ -154,6 +154,7 @@ Druid storage nodes maintain information about segments they have already downlo |--------|-----------|-------| |`druid.segmentCache.locations`|Segments assigned to a Historical node are first stored on the local file system (in a disk cache) and then served by the Historical node. These locations define where that local cache resides. | none (no caching) | |`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true| +|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|5 minutes| |`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir| ### Jetty Server Module