Merge pull request #423 from tucksaun/fix-kill-task-local-storage

Adds LocalDataSegmentKiller to allow KillTask for local storage
This commit is contained in:
fjy 2014-03-07 13:45:39 -07:00
commit 3ef75031c3
3 changed files with 78 additions and 3 deletions

View File

@ -23,15 +23,17 @@ import com.google.inject.Binder;
import com.google.inject.Key; import com.google.inject.Key;
import com.google.inject.Module; import com.google.inject.Module;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPusher; import io.druid.segment.loading.LocalDataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPusherConfig; import io.druid.segment.loading.LocalDataSegmentPusherConfig;
import io.druid.segment.loading.LocalDataSegmentKiller;
import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoader;
/** /**
*/ */
public class DataSegmentPusherPullerModule implements Module public class LocalDataStorageDruidModule implements Module
{ {
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
@ -52,6 +54,11 @@ public class DataSegmentPusherPullerModule implements Module
.to(LocalDataSegmentPuller.class) .to(LocalDataSegmentPuller.class)
.in(LazySingleton.class); .in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentKiller.class))
.addBinding("local")
.to(LocalDataSegmentKiller.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class)) PolyBind.optionBinder(binder, Key.get(DataSegmentPusher.class))
.addBinding("local") .addBinding("local")
.to(LocalDataSegmentPusher.class) .to(LocalDataSegmentPusher.class)

View File

@ -36,7 +36,7 @@ import io.druid.curator.CuratorModule;
import io.druid.curator.discovery.DiscoveryModule; import io.druid.curator.discovery.DiscoveryModule;
import io.druid.guice.AWSModule; import io.druid.guice.AWSModule;
import io.druid.guice.AnnouncerModule; import io.druid.guice.AnnouncerModule;
import io.druid.guice.DataSegmentPusherPullerModule; import io.druid.guice.LocalDataStorageDruidModule;
import io.druid.guice.DbConnectorModule; import io.druid.guice.DbConnectorModule;
import io.druid.guice.DruidGuiceExtensions; import io.druid.guice.DruidGuiceExtensions;
import io.druid.guice.DruidProcessingModule; import io.druid.guice.DruidProcessingModule;
@ -316,7 +316,7 @@ public class Initialization
new DbConnectorModule(), new DbConnectorModule(),
new JacksonConfigManagerModule(), new JacksonConfigManagerModule(),
new IndexingServiceDiscoveryModule(), new IndexingServiceDiscoveryModule(),
new DataSegmentPusherPullerModule(), new LocalDataStorageDruidModule(),
new FirehoseModule() new FirehoseModule()
); );

View File

@ -0,0 +1,68 @@
package io.druid.segment.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.util.Map;
/**
*/
public class LocalDataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(LocalDataSegmentKiller.class);
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
final File path = getDirectory(segment);
log.info("segment[%s] maps to path[%s]", segment.getIdentifier(), path);
if (!path.isDirectory()) {
if (!path.delete()) {
log.error("Unable to delete file[%s].", path);
throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier());
}
return;
}
final File[] files = path.listFiles();
int success = 0;
for (File file : files) {
if (!file.delete()) {
log.error("Unable to delete file[%s].", file);
} else {
++success;
}
}
if (success == 0 && files.length != 0) {
throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier());
}
if (success < files.length) {
log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier());
} else if (!path.delete()) {
log.warn("Unable to delete directory[%s].", path);
log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier());
}
}
private File getDirectory(DataSegment segment) throws SegmentLoadingException
{
final Map<String, Object> loadSpec = segment.getLoadSpec();
final File path = new File(MapUtils.getString(loadSpec, "path"));
if (!path.exists()) {
throw new SegmentLoadingException("Asked to load path[%s], but it doesn't exist.", path);
}
return path.getParentFile();
}
}