mirror of https://github.com/apache/druid.git
kill unwanted parent directories when a segment is deleted from LocalDataSegmentKiller
This commit is contained in:
parent
592638f6b3
commit
637d2605e7
|
@ -22,9 +22,10 @@ package io.druid.segment.loading;
|
|||
import com.metamx.common.MapUtils;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Map;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -32,53 +33,41 @@ public class LocalDataSegmentKiller implements DataSegmentKiller
|
|||
{
|
||||
private static final Logger log = new Logger(LocalDataSegmentKiller.class);
|
||||
|
||||
private static final String PATH_KEY = "path";
|
||||
|
||||
@Override
|
||||
public void kill(DataSegment segment) throws SegmentLoadingException
|
||||
{
|
||||
final File path = getDirectory(segment);
|
||||
log.info("segment[%s] maps to path[%s]", segment.getIdentifier(), path);
|
||||
final File path = getPath(segment);
|
||||
log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), path);
|
||||
|
||||
if (!path.isDirectory()) {
|
||||
if (!path.delete()) {
|
||||
log.error("Unable to delete file[%s].", path);
|
||||
throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier());
|
||||
try {
|
||||
if (path.getName().endsWith(".zip")) {
|
||||
|
||||
// path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
|
||||
File partitionNumDir = path.getParentFile();
|
||||
FileUtils.deleteDirectory(partitionNumDir);
|
||||
|
||||
//try to delete other directories if possible
|
||||
File versionDir = partitionNumDir.getParentFile();
|
||||
if (versionDir.delete()) {
|
||||
File intervalDir = versionDir.getParentFile();
|
||||
if (intervalDir.delete()) {
|
||||
File dataSourceDir = intervalDir.getParentFile();
|
||||
dataSourceDir.delete();
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
final File[] files = path.listFiles();
|
||||
int success = 0;
|
||||
|
||||
for (File file : files) {
|
||||
if (!file.delete()) {
|
||||
log.error("Unable to delete file[%s].", file);
|
||||
} else {
|
||||
++success;
|
||||
throw new SegmentLoadingException("Unknown file type[%s]", path);
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SegmentLoadingException(e, "Unable to kill segment");
|
||||
}
|
||||
}
|
||||
|
||||
if (success == 0 && files.length != 0) {
|
||||
throw new SegmentLoadingException("Couldn't kill segment[%s]", segment.getIdentifier());
|
||||
}
|
||||
|
||||
if (success < files.length) {
|
||||
log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier());
|
||||
} else if (!path.delete()) {
|
||||
log.warn("Unable to delete directory[%s].", path);
|
||||
log.warn("Couldn't completely kill segment[%s]", segment.getIdentifier());
|
||||
}
|
||||
}
|
||||
|
||||
private File getDirectory(DataSegment segment) throws SegmentLoadingException
|
||||
private File getPath(DataSegment segment) throws SegmentLoadingException
|
||||
{
|
||||
final Map<String, Object> loadSpec = segment.getLoadSpec();
|
||||
final File path = new File(MapUtils.getString(loadSpec, "path"));
|
||||
|
||||
if (!path.exists()) {
|
||||
throw new SegmentLoadingException("Asked to load path[%s], but it doesn't exist.", path);
|
||||
}
|
||||
|
||||
return path.getParentFile();
|
||||
return new File(MapUtils.getString(segment.getLoadSpec(), PATH_KEY));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,66 +19,101 @@
|
|||
|
||||
package io.druid.segment.loading;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class LocalDataSegmentKillerTest
|
||||
{
|
||||
|
||||
private File tmpDir;
|
||||
private File segmentDir;
|
||||
|
||||
private LocalDataSegmentKiller killer;
|
||||
private DataSegment segment;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException
|
||||
{
|
||||
tmpDir = Files.createTempDir();
|
||||
tmpDir.deleteOnExit();
|
||||
segmentDir = new File(
|
||||
tmpDir.getCanonicalPath()
|
||||
+ "/druid/localStorage/wikipedia/2015-04-09T15:02:00.000Z_2015-04-09T15:03:00.000Z/2015-04-09T15:02:00.000Z/0/"
|
||||
);
|
||||
segmentDir.mkdirs();
|
||||
Files.touch(new File(segmentDir.getCanonicalPath() + "/index.zip"));
|
||||
Files.touch(new File(segmentDir.getCanonicalPath() + "/description.json"));
|
||||
|
||||
killer = new LocalDataSegmentKiller();
|
||||
segment = new DataSegment(
|
||||
"test",
|
||||
new Interval("2015-04-09T15:02:00.000Z/2015-04-09T15:03:00.000Z"),
|
||||
"1",
|
||||
ImmutableMap.<String, Object>of("path", segmentDir.getCanonicalPath() + "/index.zip"),
|
||||
Arrays.asList("d"),
|
||||
Arrays.asList("m"),
|
||||
null,
|
||||
null,
|
||||
1234L
|
||||
);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception
|
||||
{
|
||||
FileUtils.deleteDirectory(tmpDir);
|
||||
}
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void testKill() throws SegmentLoadingException
|
||||
public void testKill() throws Exception
|
||||
{
|
||||
killer.kill(segment);
|
||||
Assert.assertTrue(!segmentDir.exists());
|
||||
LocalDataSegmentKiller killer = new LocalDataSegmentKiller();
|
||||
|
||||
// Create following segments and then delete them in this order and assert directory deletions
|
||||
// /tmp/dataSource/interval1/v1/0/index.zip
|
||||
// /tmp/dataSource/interval1/v1/1/index.zip
|
||||
// /tmp/dataSource/interval1/v2/0/index.zip
|
||||
// /tmp/dataSource/interval2/v1/0/index.zip
|
||||
|
||||
File dataSourceDir = temporaryFolder.newFolder();
|
||||
|
||||
File interval1Dir = new File(dataSourceDir, "interval1");
|
||||
File version11Dir = new File(interval1Dir, "v1");
|
||||
File partition011Dir = new File(version11Dir, "0");
|
||||
File partition111Dir = new File(version11Dir, "1");
|
||||
|
||||
makePartitionDirWithIndex(partition011Dir);
|
||||
makePartitionDirWithIndex(partition111Dir);
|
||||
|
||||
File version21Dir = new File(interval1Dir, "v2");
|
||||
File partition021Dir = new File(version21Dir, "0");
|
||||
|
||||
makePartitionDirWithIndex(partition021Dir);
|
||||
|
||||
File interval2Dir = new File(dataSourceDir, "interval2");
|
||||
File version12Dir = new File(interval2Dir, "v1");
|
||||
File partition012Dir = new File(version12Dir, "0");
|
||||
|
||||
makePartitionDirWithIndex(partition012Dir);
|
||||
|
||||
killer.kill(getSegmentWithPath(new File(partition011Dir, "index.zip").toString()));
|
||||
|
||||
Assert.assertFalse(partition011Dir.exists());
|
||||
Assert.assertTrue(partition111Dir.exists());
|
||||
Assert.assertTrue(partition021Dir.exists());
|
||||
Assert.assertTrue(partition012Dir.exists());
|
||||
|
||||
killer.kill(getSegmentWithPath(new File(partition111Dir, "index.zip").toString()));
|
||||
|
||||
Assert.assertFalse(version11Dir.exists());
|
||||
Assert.assertTrue(partition021Dir.exists());
|
||||
Assert.assertTrue(partition012Dir.exists());
|
||||
|
||||
killer.kill(getSegmentWithPath(new File(partition021Dir, "index.zip").toString()));
|
||||
|
||||
Assert.assertFalse(interval1Dir.exists());
|
||||
Assert.assertTrue(partition012Dir.exists());
|
||||
|
||||
killer.kill(getSegmentWithPath(new File(partition012Dir, "index.zip").toString()));
|
||||
|
||||
Assert.assertFalse(dataSourceDir.exists());
|
||||
}
|
||||
|
||||
private void makePartitionDirWithIndex(File path) throws IOException
|
||||
{
|
||||
Assert.assertTrue(path.mkdirs());
|
||||
Assert.assertTrue(new File(path, "index.zip").createNewFile());
|
||||
}
|
||||
|
||||
private DataSegment getSegmentWithPath(String path)
|
||||
{
|
||||
return new DataSegment(
|
||||
"dataSource",
|
||||
Interval.parse("2000/3000"),
|
||||
"ver",
|
||||
ImmutableMap.<String, Object>of(
|
||||
"type", "local",
|
||||
"path", path
|
||||
),
|
||||
ImmutableList.of("product"),
|
||||
ImmutableList.of("visited_sum", "unique_hosts"),
|
||||
new NoneShardSpec(),
|
||||
9,
|
||||
12334
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue