mirror of
https://github.com/apache/druid.git
synced 2025-02-17 15:35:56 +00:00
clean tmp file when index merge fail
This commit is contained in:
parent
041350c31b
commit
57d78d3293
@ -34,6 +34,7 @@ import com.google.common.collect.Ordering;
|
|||||||
import com.google.common.collect.PeekingIterator;
|
import com.google.common.collect.PeekingIterator;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.io.ByteStreams;
|
import com.google.common.io.ByteStreams;
|
||||||
|
import com.google.common.io.Closer;
|
||||||
import com.google.common.io.Files;
|
import com.google.common.io.Files;
|
||||||
import com.google.common.io.OutputSupplier;
|
import com.google.common.io.OutputSupplier;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
@ -85,6 +86,7 @@ import org.joda.time.DateTime;
|
|||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -604,11 +606,28 @@ public class IndexMerger
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Closer closer = Closer.create();
|
||||||
final Interval dataInterval;
|
final Interval dataInterval;
|
||||||
File v8OutDir = new File(outDir, "v8-tmp");
|
final File v8OutDir = new File(outDir, "v8-tmp");
|
||||||
v8OutDir.mkdirs();
|
v8OutDir.mkdirs();
|
||||||
|
closer.register(new Closeable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
FileUtils.deleteDirectory(v8OutDir);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
final IOPeon ioPeon = new TmpFileIOPeon();
|
||||||
|
closer.register(new Closeable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
ioPeon.cleanup();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
/************* Main index.drd file **************/
|
/************* Main index.drd file **************/
|
||||||
progress.progress();
|
progress.progress();
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
@ -640,7 +659,6 @@ public class IndexMerger
|
|||||||
progress.progress();
|
progress.progress();
|
||||||
startTime = System.currentTimeMillis();
|
startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
IOPeon ioPeon = new TmpFileIOPeon();
|
|
||||||
ArrayList<FileOutputSupplier> dimOuts = Lists.newArrayListWithCapacity(mergedDimensions.size());
|
ArrayList<FileOutputSupplier> dimOuts = Lists.newArrayListWithCapacity(mergedDimensions.size());
|
||||||
Map<String, Integer> dimensionCardinalities = Maps.newHashMap();
|
Map<String, Integer> dimensionCardinalities = Maps.newHashMap();
|
||||||
ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(indexes.size());
|
ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(indexes.size());
|
||||||
@ -1007,10 +1025,12 @@ public class IndexMerger
|
|||||||
);
|
);
|
||||||
|
|
||||||
indexIO.getDefaultIndexIOHandler().convertV8toV9(v8OutDir, outDir, indexSpec);
|
indexIO.getDefaultIndexIOHandler().convertV8toV9(v8OutDir, outDir, indexSpec);
|
||||||
FileUtils.deleteDirectory(v8OutDir);
|
|
||||||
|
|
||||||
return outDir;
|
return outDir;
|
||||||
}
|
}
|
||||||
|
finally {
|
||||||
|
closer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected Iterable<Rowboat> makeRowIterable(
|
protected Iterable<Rowboat> makeRowIterable(
|
||||||
List<IndexableAdapter> indexes,
|
List<IndexableAdapter> indexes,
|
||||||
|
@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
|
|||||||
import com.google.common.collect.Ordering;
|
import com.google.common.collect.Ordering;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.io.ByteStreams;
|
import com.google.common.io.ByteStreams;
|
||||||
|
import com.google.common.io.Closer;
|
||||||
import com.google.common.io.Files;
|
import com.google.common.io.Files;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
@ -74,6 +75,7 @@ import org.joda.time.DateTime;
|
|||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -144,12 +146,29 @@ public class IndexMergerV9 extends IndexMerger
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Closer closer = Closer.create();
|
||||||
final IOPeon ioPeon = new TmpFileIOPeon(false);
|
final IOPeon ioPeon = new TmpFileIOPeon(false);
|
||||||
|
closer.register(new Closeable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
ioPeon.cleanup();
|
||||||
|
}
|
||||||
|
});
|
||||||
final FileSmoosher v9Smoosher = new FileSmoosher(outDir);
|
final FileSmoosher v9Smoosher = new FileSmoosher(outDir);
|
||||||
final File v9TmpDir = new File(outDir, "v9-tmp");
|
final File v9TmpDir = new File(outDir, "v9-tmp");
|
||||||
v9TmpDir.mkdirs();
|
v9TmpDir.mkdirs();
|
||||||
|
closer.register(new Closeable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
FileUtils.deleteDirectory(v9TmpDir);
|
||||||
|
}
|
||||||
|
});
|
||||||
log.info("Start making v9 index files, outDir:%s", outDir);
|
log.info("Start making v9 index files, outDir:%s", outDir);
|
||||||
|
try {
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
ByteStreams.write(
|
ByteStreams.write(
|
||||||
Ints.toByteArray(IndexIO.V9_VERSION),
|
Ints.toByteArray(IndexIO.V9_VERSION),
|
||||||
@ -235,12 +254,14 @@ public class IndexMergerV9 extends IndexMerger
|
|||||||
makeMetadataBinary(v9Smoosher, progress, segmentMetadata);
|
makeMetadataBinary(v9Smoosher, progress, segmentMetadata);
|
||||||
|
|
||||||
v9Smoosher.close();
|
v9Smoosher.close();
|
||||||
ioPeon.cleanup();
|
|
||||||
FileUtils.deleteDirectory(v9TmpDir);
|
|
||||||
progress.stop();
|
progress.stop();
|
||||||
|
|
||||||
return outDir;
|
return outDir;
|
||||||
}
|
}
|
||||||
|
finally {
|
||||||
|
closer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void makeMetadataBinary(
|
private void makeMetadataBinary(
|
||||||
final FileSmoosher v9Smoosher,
|
final FileSmoosher v9Smoosher,
|
||||||
|
@ -37,6 +37,7 @@ import io.druid.query.aggregation.AggregatorFactory;
|
|||||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||||
import io.druid.segment.column.Column;
|
import io.druid.segment.column.Column;
|
||||||
|
import io.druid.segment.column.ColumnCapabilitiesImpl;
|
||||||
import io.druid.segment.column.SimpleDictionaryEncodedColumn;
|
import io.druid.segment.column.SimpleDictionaryEncodedColumn;
|
||||||
import io.druid.segment.data.BitmapSerdeFactory;
|
import io.druid.segment.data.BitmapSerdeFactory;
|
||||||
import io.druid.segment.data.CompressedObjectStrategy;
|
import io.druid.segment.data.CompressedObjectStrategy;
|
||||||
@ -67,7 +68,6 @@ import java.nio.IntBuffer;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@ -1662,4 +1662,34 @@ public class IndexMergerTest
|
|||||||
Assert.assertEquals(2, dictIdSeeker.seek(4));
|
Assert.assertEquals(2, dictIdSeeker.seek(4));
|
||||||
Assert.assertEquals(-1, dictIdSeeker.seek(5));
|
Assert.assertEquals(-1, dictIdSeeker.seek(5));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testCloser() throws Exception
|
||||||
|
{
|
||||||
|
final long timestamp = System.currentTimeMillis();
|
||||||
|
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null);
|
||||||
|
IncrementalIndexTest.populateIndex(timestamp, toPersist);
|
||||||
|
ColumnCapabilitiesImpl capabilities = (ColumnCapabilitiesImpl) toPersist.getCapabilities("dim1");
|
||||||
|
capabilities.setHasSpatialIndexes(true);
|
||||||
|
|
||||||
|
final File tempDir = temporaryFolder.newFolder();
|
||||||
|
final File v8TmpDir = new File(tempDir, "v8-tmp");
|
||||||
|
final File v9TmpDir = new File(tempDir, "v9-tmp");
|
||||||
|
|
||||||
|
try {
|
||||||
|
INDEX_MERGER.persist(
|
||||||
|
toPersist,
|
||||||
|
tempDir,
|
||||||
|
indexSpec
|
||||||
|
);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
if (v8TmpDir.exists()) {
|
||||||
|
Assert.fail("v8-tmp dir not clean.");
|
||||||
|
}
|
||||||
|
if (v9TmpDir.exists()) {
|
||||||
|
Assert.fail("v9-tmp dir not clean.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user