diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java index e5ccdf5e2e9..d2262d0ccb3 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java @@ -20,6 +20,7 @@ package io.druid.storage.hdfs; import com.google.inject.Inject; +import com.metamx.common.logger.Logger; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; @@ -31,6 +32,10 @@ import java.io.IOException; public class HdfsDataSegmentKiller implements DataSegmentKiller { + private static final Logger log = new Logger(HdfsDataSegmentKiller.class); + + private static final String PATH_KEY = "path"; + private final Configuration config; @Inject @@ -43,6 +48,8 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller public void kill(DataSegment segment) throws SegmentLoadingException { final Path path = getPath(segment); + log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), path); + final FileSystem fs = checkPathAndGetFilesystem(path); try { if (path.getName().endsWith(".zip")) { @@ -86,7 +93,7 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller private Path getPath(DataSegment segment) { - return new Path(String.valueOf(segment.getLoadSpec().get("path"))); + return new Path(String.valueOf(segment.getLoadSpec().get(PATH_KEY))); } private FileSystem checkPathAndGetFilesystem(Path path) throws SegmentLoadingException diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java index 8af6fafca9a..80633ecc1cf 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java @@ -22,9 +22,10 @@ package io.druid.segment.loading; import com.metamx.common.MapUtils; import com.metamx.common.logger.Logger; import io.druid.timeline.DataSegment; +import org.apache.commons.io.FileUtils; import java.io.File; -import java.util.Map; +import java.io.IOException; /** */ @@ -32,53 +33,41 @@ public class LocalDataSegmentKiller implements DataSegmentKiller { private static final Logger log = new Logger(LocalDataSegmentKiller.class); + private static final String PATH_KEY = "path"; + @Override public void kill(DataSegment segment) throws SegmentLoadingException { - final File path = getDirectory(segment); - log.info("segment[%s] maps to path[%s]", segment.getIdentifier(), path); + final File path = getPath(segment); + log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), path); - if (!path.isDirectory()) { - if (!path.delete()) { - log.error("Unable to delete file[%s].", path); - throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier()); - } + try { + if (path.getName().endsWith(".zip")) { - return; - } + // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip + File partitionNumDir = path.getParentFile(); + FileUtils.deleteDirectory(partitionNumDir); - final File[] files = path.listFiles(); - int success = 0; - - for (File file : files) { - if (!file.delete()) { - log.error("Unable to delete file[%s].", file); + //try to delete other directories if possible + File versionDir = partitionNumDir.getParentFile(); + if (versionDir.delete()) { + File intervalDir = versionDir.getParentFile(); + if (intervalDir.delete()) { + File dataSourceDir = intervalDir.getParentFile(); + dataSourceDir.delete(); + } + } } else { - ++success; + throw new SegmentLoadingException("Unknown file type[%s]", path); } } - - if (success == 0 && files.length != 0) { - throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier()); - } - - if (success < files.length) { - log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier()); - } else if (!path.delete()) { - log.warn("Unable to delete directory[%s].", path); - log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier()); + catch (IOException e) { + throw new SegmentLoadingException(e, "Unable to kill segment"); } } - private File getDirectory(DataSegment segment) throws SegmentLoadingException + private File getPath(DataSegment segment) throws SegmentLoadingException { - final Map loadSpec = segment.getLoadSpec(); - final File path = new File(MapUtils.getString(loadSpec, "path")); - - if (!path.exists()) { - throw new SegmentLoadingException("Asked to load path[%s], but it doesn't exist.", path); - } - - return path.getParentFile(); + return new File(MapUtils.getString(segment.getLoadSpec(), PATH_KEY)); } } diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentKillerTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentKillerTest.java index e6c10f25e0a..e172afe1488 100644 --- a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentKillerTest.java +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentKillerTest.java @@ -19,66 +19,101 @@ package io.druid.segment.loading; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.io.Files; import io.druid.timeline.DataSegment; -import org.apache.commons.io.FileUtils; +import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.Interval; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; -import java.util.Arrays; public class LocalDataSegmentKillerTest { - private File tmpDir; - private File segmentDir; - - private LocalDataSegmentKiller killer; - private DataSegment segment; - - @Before - public void setUp() throws IOException - { - tmpDir = Files.createTempDir(); - tmpDir.deleteOnExit(); - segmentDir = new File( - tmpDir.getCanonicalPath() - + "/druid/localStorage/wikipedia/2015-04-09T15:02:00.000Z_2015-04-09T15:03:00.000Z/2015-04-09T15:02:00.000Z/0/" - ); - segmentDir.mkdirs(); - Files.touch(new File(segmentDir.getCanonicalPath() + "/index.zip")); - Files.touch(new File(segmentDir.getCanonicalPath() + "/description.json")); - - killer = new LocalDataSegmentKiller(); - segment = new DataSegment( - "test", - new Interval("2015-04-09T15:02:00.000Z/2015-04-09T15:03:00.000Z"), - "1", - ImmutableMap.of("path", segmentDir.getCanonicalPath() + "/index.zip"), - Arrays.asList("d"), - Arrays.asList("m"), - null, - null, - 1234L - ); - } - - @After - public void tearDown() throws Exception - { - FileUtils.deleteDirectory(tmpDir); - } + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test - public void testKill() throws SegmentLoadingException + public void testKill() throws Exception { - killer.kill(segment); - Assert.assertTrue(!segmentDir.exists()); + LocalDataSegmentKiller killer = new LocalDataSegmentKiller(); + + // Create following segments and then delete them in this order and assert directory deletions + // /tmp/dataSource/interval1/v1/0/index.zip + // /tmp/dataSource/interval1/v1/1/index.zip + // /tmp/dataSource/interval1/v2/0/index.zip + // /tmp/dataSource/interval2/v1/0/index.zip + + File dataSourceDir = temporaryFolder.newFolder(); + + File interval1Dir = new File(dataSourceDir, "interval1"); + File version11Dir = new File(interval1Dir, "v1"); + File partition011Dir = new File(version11Dir, "0"); + File partition111Dir = new File(version11Dir, "1"); + + makePartitionDirWithIndex(partition011Dir); + makePartitionDirWithIndex(partition111Dir); + + File version21Dir = new File(interval1Dir, "v2"); + File partition021Dir = new File(version21Dir, "0"); + + makePartitionDirWithIndex(partition021Dir); + + File interval2Dir = new File(dataSourceDir, "interval2"); + File version12Dir = new File(interval2Dir, "v1"); + File partition012Dir = new File(version12Dir, "0"); + + makePartitionDirWithIndex(partition012Dir); + + killer.kill(getSegmentWithPath(new File(partition011Dir, "index.zip").toString())); + + Assert.assertFalse(partition011Dir.exists()); + Assert.assertTrue(partition111Dir.exists()); + Assert.assertTrue(partition021Dir.exists()); + Assert.assertTrue(partition012Dir.exists()); + + killer.kill(getSegmentWithPath(new File(partition111Dir, "index.zip").toString())); + + Assert.assertFalse(version11Dir.exists()); + Assert.assertTrue(partition021Dir.exists()); + Assert.assertTrue(partition012Dir.exists()); + + killer.kill(getSegmentWithPath(new File(partition021Dir, "index.zip").toString())); + + Assert.assertFalse(interval1Dir.exists()); + Assert.assertTrue(partition012Dir.exists()); + + killer.kill(getSegmentWithPath(new File(partition012Dir, "index.zip").toString())); + + Assert.assertFalse(dataSourceDir.exists()); + } + + private void makePartitionDirWithIndex(File path) throws IOException + { + Assert.assertTrue(path.mkdirs()); + Assert.assertTrue(new File(path, "index.zip").createNewFile()); + } + + private DataSegment getSegmentWithPath(String path) + { + return new DataSegment( + "dataSource", + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", path + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12334 + ); } }