From 7dbcc745103d0c1f4843ca4689866060da733dd4 Mon Sep 17 00:00:00 2001 From: Tugdual Saunier Date: Fri, 7 Mar 2014 11:28:33 +0000 Subject: [PATCH] Adds LocalDataSegmentKiller to allow KillTask for local storage --- ....java => LocalDataStorageDruidModule.java} | 9 ++- .../druid/initialization/Initialization.java | 4 +- .../loading/LocalDataSegmentKiller.java | 68 +++++++++++++++++++ 3 files changed, 78 insertions(+), 3 deletions(-) rename server/src/main/java/io/druid/guice/{DataSegmentPusherPullerModule.java => LocalDataStorageDruidModule.java} (86%) create mode 100644 server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java diff --git a/server/src/main/java/io/druid/guice/DataSegmentPusherPullerModule.java b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java similarity index 86% rename from server/src/main/java/io/druid/guice/DataSegmentPusherPullerModule.java rename to server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java index af45ea18f17..4511359a522 100644 --- a/server/src/main/java/io/druid/guice/DataSegmentPusherPullerModule.java +++ b/server/src/main/java/io/druid/guice/LocalDataStorageDruidModule.java @@ -23,15 +23,17 @@ import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Module; import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPusherConfig; +import io.druid.segment.loading.LocalDataSegmentKiller; import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.SegmentLoader; /** */ -public class DataSegmentPusherPullerModule implements Module +public class LocalDataStorageDruidModule implements Module { @Override public void configure(Binder binder) @@ -52,6 +54,11 @@ public class DataSegmentPusherPullerModule implements Module .to(LocalDataSegmentPuller.class) .in(LazySingleton.class); + PolyBind.optionBinder(binder, Key.get(DataSegmentKiller.class)) + .addBinding("local") + .to(LocalDataSegmentKiller.class) + .in(LazySingleton.class); + PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) .addBinding("local") .to(LocalDataSegmentPusher.class) diff --git a/server/src/main/java/io/druid/initialization/Initialization.java b/server/src/main/java/io/druid/initialization/Initialization.java index 5e8e0461202..37dcc5821b0 100644 --- a/server/src/main/java/io/druid/initialization/Initialization.java +++ b/server/src/main/java/io/druid/initialization/Initialization.java @@ -36,7 +36,7 @@ import io.druid.curator.CuratorModule; import io.druid.curator.discovery.DiscoveryModule; import io.druid.guice.AWSModule; import io.druid.guice.AnnouncerModule; -import io.druid.guice.DataSegmentPusherPullerModule; +import io.druid.guice.LocalDataStorageDruidModule; import io.druid.guice.DbConnectorModule; import io.druid.guice.DruidGuiceExtensions; import io.druid.guice.DruidProcessingModule; @@ -316,7 +316,7 @@ public class Initialization new DbConnectorModule(), new JacksonConfigManagerModule(), new IndexingServiceDiscoveryModule(), - new DataSegmentPusherPullerModule(), + new LocalDataStorageDruidModule(), new FirehoseModule() ); diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java new file mode 100644 index 00000000000..014805e4d0b --- /dev/null +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java @@ -0,0 +1,68 @@ +package io.druid.segment.loading; + +import com.google.inject.Inject; +import com.metamx.common.MapUtils; +import com.metamx.common.logger.Logger; +import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; + +import java.io.File; +import java.util.Map; + +/** + */ +public class LocalDataSegmentKiller implements DataSegmentKiller +{ + private static final Logger log = new Logger(LocalDataSegmentKiller.class); + + @Override + public void kill(DataSegment segment) throws SegmentLoadingException + { + final File path = getDirectory(segment); + log.info("segment[%s] maps to path[%s]", segment.getIdentifier(), path); + + if (!path.isDirectory()) { + if (!path.delete()) { + log.error("Unable to delete file[%s].", path); + throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier()); + } + + return; + } + + final File[] files = path.listFiles(); + int success = 0; + + for (File file : files) { + if (!file.delete()) { + log.error("Unable to delete file[%s].", file); + } else { + ++success; + } + } + + if (success == 0 && files.length != 0) { + throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier()); + } + + if (success < files.length) { + log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier()); + } else if (!path.delete()) { + log.warn("Unable to delete directory[%s].", path); + log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier()); + } + } + + private File getDirectory(DataSegment segment) throws SegmentLoadingException + { + final Map loadSpec = segment.getLoadSpec(); + final File path = new File(MapUtils.getString(loadSpec, "path")); + + if (!path.exists()) { + throw new SegmentLoadingException("Asked to load path[%s], but it doesn't exist.", path); + } + + return path.getParentFile(); + } +}