diff --git a/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java b/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java new file mode 100644 index 00000000000..4d4ddf70766 --- /dev/null +++ b/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java @@ -0,0 +1,76 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.storage.hdfs; + +import com.google.inject.Inject; +import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +public class HdfsDataSegmentKiller implements DataSegmentKiller +{ + private final Configuration config; + + @Inject + public HdfsDataSegmentKiller(final Configuration config) + { + this.config = config; + } + + @Override + public void kill(DataSegment segment) throws SegmentLoadingException + { + final Path path = getPath(segment); + final FileSystem fs = checkPathAndGetFilesystem(path); + try { + fs.delete(path, true); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "Unable to kill segment"); + } + } + + private Path getPath(DataSegment segment) + { + return new Path(String.valueOf(segment.getLoadSpec().get("path"))); + } + + private FileSystem checkPathAndGetFilesystem(Path path) throws SegmentLoadingException + { + FileSystem fs; + try { + fs = path.getFileSystem(config); + + if (!fs.exists(path)) { + throw new SegmentLoadingException("Path[%s] doesn't exist.", path); + } + + return fs; + } + catch (IOException e) { + throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", path); + } + } +} diff --git a/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java b/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java index e2fb1475742..0ebb477a593 100644 --- a/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -55,6 +55,7 @@ public class HdfsStorageDruidModule implements DruidModule { Binders.dataSegmentPullerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPuller.class).in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding("hdfs").to(HdfsDataSegmentPusher.class).in(LazySingleton.class); + Binders.dataSegmentKillerBinder(binder).addBinding("hdfs").to(HdfsDataSegmentKiller.class).in(LazySingleton.class); final Configuration conf = new Configuration(); if (props != null) {