From c2bebfe39e9d4507edb5560a93a0def262e39ea9 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sun, 23 Aug 2015 22:06:12 -0500 Subject: [PATCH 1/2] delete version, interval, dataSource directories on segment deletion if possible, so that they are not left behind and consume ns quota on HDFS --- .../storage/hdfs/HdfsDataSegmentKiller.java | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java index 8a77ad85f95..d1314162252 100644 --- a/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java +++ b/extensions/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java @@ -44,8 +44,25 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller final FileSystem fs = checkPathAndGetFilesystem(path); try { if (path.getName().endsWith(".zip")) { - // delete the parent directory containing the zip file and the descriptor - fs.delete(path.getParent(), true); + + // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip + Path partitionNumDir = path.getParent(); + if (!fs.delete(partitionNumDir, true)) { + throw new SegmentLoadingException( + "Unable to kill segment, failed to delete dir [%s]", + partitionNumDir.toString() + ); + } + + //try to delete other directories if possible + Path versionDir = partitionNumDir.getParent(); + if (safeNonRecursiveDelete(fs, versionDir)) { + Path intervalDir = versionDir.getParent(); + if (safeNonRecursiveDelete(fs, intervalDir)) { + Path dataSourceDir = intervalDir.getParent(); + safeNonRecursiveDelete(fs, dataSourceDir); + } + } } else { throw new SegmentLoadingException("Unknown file type[%s]", path); } @@ -55,6 +72,16 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller } } + private boolean safeNonRecursiveDelete(FileSystem fs, Path path) + { + try { + return fs.delete(path, false); + } + catch (Exception ex) { + return false; + } + } + private Path getPath(DataSegment segment) { return new Path(String.valueOf(segment.getLoadSpec().get("path"))); From 5b5a76ef6cca2ab88768470624469060f53f1a58 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sun, 23 Aug 2015 22:06:46 -0500 Subject: [PATCH 2/2] adding unit test for HdfsDataSegmentKiller.testKill(..) --- .../hdfs/HdfsDataSegmentKillerTest.java | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 extensions/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentKillerTest.java diff --git a/extensions/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentKillerTest.java b/extensions/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentKillerTest.java new file mode 100644 index 00000000000..74dca25e42f --- /dev/null +++ b/extensions/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentKillerTest.java @@ -0,0 +1,122 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.storage.hdfs; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +/** + */ +public class HdfsDataSegmentKillerTest +{ + @Test + public void testKill() throws Exception + { + Configuration config = new Configuration(); + HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(config); + + FileSystem fs = FileSystem.get(config); + + // Create following segments and then delete them in this order and assert directory deletions + // /tmp/dataSource/interval1/v1/0/index.zip + // /tmp/dataSource/interval1/v1/1/index.zip + // /tmp/dataSource/interval1/v2/0/index.zip + // /tmp/dataSource/interval2/v1/0/index.zip + + Path dataSourceDir = new Path("/tmp/dataSource"); + + Path interval1Dir = new Path(dataSourceDir, "interval1"); + Path version11Dir = new Path(interval1Dir, "v1"); + Path partition011Dir = new Path(version11Dir, "0"); + Path partition111Dir = new Path(version11Dir, "1"); + + makePartitionDirWithIndex(fs, partition011Dir); + makePartitionDirWithIndex(fs, partition111Dir); + + Path version21Dir = new Path(interval1Dir, "v2"); + Path partition021Dir = new Path(version21Dir, "0"); + + makePartitionDirWithIndex(fs, partition021Dir); + + Path interval2Dir = new Path(dataSourceDir, "interval2"); + Path version12Dir = new Path(interval2Dir, "v1"); + Path partition012Dir = new Path(version12Dir, "0"); + + makePartitionDirWithIndex(fs, partition012Dir); + + killer.kill(getSegmentWithPath(new Path(partition011Dir, "index.zip").toString())); + + Assert.assertFalse(fs.exists(partition011Dir)); + Assert.assertTrue(fs.exists(partition111Dir)); + Assert.assertTrue(fs.exists(partition021Dir)); + Assert.assertTrue(fs.exists(partition012Dir)); + + killer.kill(getSegmentWithPath(new Path(partition111Dir, "index.zip").toString())); + + Assert.assertFalse(fs.exists(version11Dir)); + Assert.assertTrue(fs.exists(partition021Dir)); + Assert.assertTrue(fs.exists(partition012Dir)); + + killer.kill(getSegmentWithPath(new Path(partition021Dir, "index.zip").toString())); + + Assert.assertFalse(fs.exists(interval1Dir)); + Assert.assertTrue(fs.exists(partition012Dir)); + + killer.kill(getSegmentWithPath(new Path(partition012Dir, "index.zip").toString())); + + Assert.assertFalse(fs.exists(dataSourceDir)); + } + + private void makePartitionDirWithIndex(FileSystem fs, Path path) throws IOException + { + Assert.assertTrue(fs.mkdirs(path)); + try (FSDataOutputStream os = fs.create(new Path(path, "index.zip"))) { + } + } + + private DataSegment getSegmentWithPath(String path) + { + return new DataSegment( + "dataSource", + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", path + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12334 + ); + } +}