Merge pull request #2275 from himanshug/local_segment_killer_fix

kill unwanted parent directories when a segment is deleted from LocalDataSegmentKiller
This commit is contained in:
Fangjin Yang 2016-01-18 22:54:01 -08:00
commit a2e327ed08
3 changed files with 114 additions and 83 deletions

View File

@ -20,6 +20,7 @@
package io.druid.storage.hdfs; package io.druid.storage.hdfs;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -31,6 +32,10 @@ import java.io.IOException;
public class HdfsDataSegmentKiller implements DataSegmentKiller public class HdfsDataSegmentKiller implements DataSegmentKiller
{ {
private static final Logger log = new Logger(HdfsDataSegmentKiller.class);
private static final String PATH_KEY = "path";
private final Configuration config; private final Configuration config;
@Inject @Inject
@ -43,6 +48,8 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
public void kill(DataSegment segment) throws SegmentLoadingException public void kill(DataSegment segment) throws SegmentLoadingException
{ {
final Path path = getPath(segment); final Path path = getPath(segment);
log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), path);
final FileSystem fs = checkPathAndGetFilesystem(path); final FileSystem fs = checkPathAndGetFilesystem(path);
try { try {
if (path.getName().endsWith(".zip")) { if (path.getName().endsWith(".zip")) {
@ -86,7 +93,7 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
private Path getPath(DataSegment segment) private Path getPath(DataSegment segment)
{ {
return new Path(String.valueOf(segment.getLoadSpec().get("path"))); return new Path(String.valueOf(segment.getLoadSpec().get(PATH_KEY)));
} }
private FileSystem checkPathAndGetFilesystem(Path path) throws SegmentLoadingException private FileSystem checkPathAndGetFilesystem(Path path) throws SegmentLoadingException

View File

@ -22,9 +22,10 @@ package io.druid.segment.loading;
import com.metamx.common.MapUtils; import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import java.io.File; import java.io.File;
import java.util.Map; import java.io.IOException;
/** /**
*/ */
@ -32,53 +33,41 @@ public class LocalDataSegmentKiller implements DataSegmentKiller
{ {
private static final Logger log = new Logger(LocalDataSegmentKiller.class); private static final Logger log = new Logger(LocalDataSegmentKiller.class);
private static final String PATH_KEY = "path";
@Override @Override
public void kill(DataSegment segment) throws SegmentLoadingException public void kill(DataSegment segment) throws SegmentLoadingException
{ {
final File path = getDirectory(segment); final File path = getPath(segment);
log.info("segment[%s] maps to path[%s]", segment.getIdentifier(), path); log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), path);
if (!path.isDirectory()) { try {
if (!path.delete()) { if (path.getName().endsWith(".zip")) {
log.error("Unable to delete file[%s].", path);
throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier()); // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
File partitionNumDir = path.getParentFile();
FileUtils.deleteDirectory(partitionNumDir);
//try to delete other directories if possible
File versionDir = partitionNumDir.getParentFile();
if (versionDir.delete()) {
File intervalDir = versionDir.getParentFile();
if (intervalDir.delete()) {
File dataSourceDir = intervalDir.getParentFile();
dataSourceDir.delete();
} }
return;
} }
final File[] files = path.listFiles();
int success = 0;
for (File file : files) {
if (!file.delete()) {
log.error("Unable to delete file[%s].", file);
} else { } else {
++success; throw new SegmentLoadingException("Unknown file type[%s]", path);
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to kill segment");
} }
} }
if (success == 0 && files.length != 0) { private File getPath(DataSegment segment) throws SegmentLoadingException
throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier());
}
if (success < files.length) {
log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier());
} else if (!path.delete()) {
log.warn("Unable to delete directory[%s].", path);
log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier());
}
}
private File getDirectory(DataSegment segment) throws SegmentLoadingException
{ {
final Map<String, Object> loadSpec = segment.getLoadSpec(); return new File(MapUtils.getString(segment.getLoadSpec(), PATH_KEY));
final File path = new File(MapUtils.getString(loadSpec, "path"));
if (!path.exists()) {
throw new SegmentLoadingException("Asked to load path[%s], but it doesn't exist.", path);
}
return path.getParentFile();
} }
} }

View File

@ -19,66 +19,101 @@
package io.druid.segment.loading; package io.druid.segment.loading;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils; import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
public class LocalDataSegmentKillerTest public class LocalDataSegmentKillerTest
{ {
private File tmpDir; @Rule
private File segmentDir; public TemporaryFolder temporaryFolder = new TemporaryFolder();
private LocalDataSegmentKiller killer;
private DataSegment segment;
@Before
public void setUp() throws IOException
{
tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
segmentDir = new File(
tmpDir.getCanonicalPath()
+ "/druid/localStorage/wikipedia/2015-04-09T15:02:00.000Z_2015-04-09T15:03:00.000Z/2015-04-09T15:02:00.000Z/0/"
);
segmentDir.mkdirs();
Files.touch(new File(segmentDir.getCanonicalPath() + "/index.zip"));
Files.touch(new File(segmentDir.getCanonicalPath() + "/description.json"));
killer = new LocalDataSegmentKiller();
segment = new DataSegment(
"test",
new Interval("2015-04-09T15:02:00.000Z/2015-04-09T15:03:00.000Z"),
"1",
ImmutableMap.<String, Object>of("path", segmentDir.getCanonicalPath() + "/index.zip"),
Arrays.asList("d"),
Arrays.asList("m"),
null,
null,
1234L
);
}
@After
public void tearDown() throws Exception
{
FileUtils.deleteDirectory(tmpDir);
}
@Test @Test
public void testKill() throws SegmentLoadingException public void testKill() throws Exception
{ {
killer.kill(segment); LocalDataSegmentKiller killer = new LocalDataSegmentKiller();
Assert.assertTrue(!segmentDir.exists());
// Create following segments and then delete them in this order and assert directory deletions
// /tmp/dataSource/interval1/v1/0/index.zip
// /tmp/dataSource/interval1/v1/1/index.zip
// /tmp/dataSource/interval1/v2/0/index.zip
// /tmp/dataSource/interval2/v1/0/index.zip
File dataSourceDir = temporaryFolder.newFolder();
File interval1Dir = new File(dataSourceDir, "interval1");
File version11Dir = new File(interval1Dir, "v1");
File partition011Dir = new File(version11Dir, "0");
File partition111Dir = new File(version11Dir, "1");
makePartitionDirWithIndex(partition011Dir);
makePartitionDirWithIndex(partition111Dir);
File version21Dir = new File(interval1Dir, "v2");
File partition021Dir = new File(version21Dir, "0");
makePartitionDirWithIndex(partition021Dir);
File interval2Dir = new File(dataSourceDir, "interval2");
File version12Dir = new File(interval2Dir, "v1");
File partition012Dir = new File(version12Dir, "0");
makePartitionDirWithIndex(partition012Dir);
killer.kill(getSegmentWithPath(new File(partition011Dir, "index.zip").toString()));
Assert.assertFalse(partition011Dir.exists());
Assert.assertTrue(partition111Dir.exists());
Assert.assertTrue(partition021Dir.exists());
Assert.assertTrue(partition012Dir.exists());
killer.kill(getSegmentWithPath(new File(partition111Dir, "index.zip").toString()));
Assert.assertFalse(version11Dir.exists());
Assert.assertTrue(partition021Dir.exists());
Assert.assertTrue(partition012Dir.exists());
killer.kill(getSegmentWithPath(new File(partition021Dir, "index.zip").toString()));
Assert.assertFalse(interval1Dir.exists());
Assert.assertTrue(partition012Dir.exists());
killer.kill(getSegmentWithPath(new File(partition012Dir, "index.zip").toString()));
Assert.assertFalse(dataSourceDir.exists());
}
private void makePartitionDirWithIndex(File path) throws IOException
{
Assert.assertTrue(path.mkdirs());
Assert.assertTrue(new File(path, "index.zip").createNewFile());
}
private DataSegment getSegmentWithPath(String path)
{
return new DataSegment(
"dataSource",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", path
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12334
);
} }
} }