cleanup temp files for nested column serializer (#15236)

* cleanup temp files for nested column serializer

* fix style

* fix tests in default value mode
This commit is contained in:
Clint Wylie 2023-10-24 15:30:00 -07:00 committed by GitHub
parent 63e3e9531d
commit 4149c9422c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 281 additions and 52 deletions

View File

@ -430,8 +430,12 @@ public class FileUtils
*
* @throws IllegalStateException if the directory could not be created
*/
@SuppressForbidden(reason = "Files#createTempDirectory")
public static File createTempDir(@Nullable final String prefix)
{
return createTempDirInLocation(getTempDir(), prefix);
}
public static Path getTempDir()
{
final String parentDirectory = System.getProperty("java.io.tmpdir");
@ -439,24 +443,29 @@ public class FileUtils
// Not expected.
throw new ISE("System property java.io.tmpdir is not set, cannot create temporary directories");
}
return new File(parentDirectory).toPath();
}
@SuppressForbidden(reason = "Files#createTempDirectory")
public static File createTempDirInLocation(final Path parentDirectory, @Nullable final String prefix)
{
try {
final Path tmpPath = Files.createTempDirectory(
new File(parentDirectory).toPath(),
parentDirectory,
prefix == null || prefix.isEmpty() ? "druid" : prefix
);
return tmpPath.toFile();
}
catch (IOException e) {
// Some inspection to improve error messages.
if (e instanceof NoSuchFileException && !new File(parentDirectory).exists()) {
throw new ISE("java.io.tmpdir (%s) does not exist", parentDirectory);
if (e instanceof NoSuchFileException && !parentDirectory.toFile().exists()) {
throw new ISE("Path [%s] does not exist", parentDirectory);
} else if ((e instanceof FileSystemException && e.getMessage().contains("Read-only file system"))
|| (e instanceof AccessDeniedException)) {
throw new ISE("java.io.tmpdir (%s) is not writable, check permissions", parentDirectory);
throw new ISE("Path [%s] is not writable, check permissions", parentDirectory);
} else {
// Well, maybe it was something else.
throw new ISE(e, "Failed to create temporary directory in java.io.tmpdir (%s)", parentDirectory);
throw new ISE(e, "Failed to create temporary directory in path [%s]", parentDirectory);
}
}
}

View File

@ -56,28 +56,35 @@ import java.util.EnumSet;
public final class DictionaryIdLookup implements Closeable
{
private final String name;
private final Path tempBasePath;
@Nullable
private final DictionaryWriter<String> stringDictionaryWriter;
private Path stringDictionaryFile = null;
private SmooshedFileMapper stringBufferMapper = null;
private Indexed<ByteBuffer> stringDictionary = null;
@Nullable
private final DictionaryWriter<Long> longDictionaryWriter;
private Path longDictionaryFile = null;
private MappedByteBuffer longBuffer = null;
private FixedIndexed<Long> longDictionary = null;
@Nullable
private final DictionaryWriter<Double> doubleDictionaryWriter;
private Path doubleDictionaryFile = null;
MappedByteBuffer doubleBuffer = null;
FixedIndexed<Double> doubleDictionary = null;
@Nullable
private final DictionaryWriter<int[]> arrayDictionaryWriter;
private Path arrayDictionaryFile = null;
private MappedByteBuffer arrayBuffer = null;
private FrontCodedIntArrayIndexed arrayDictionary = null;
public DictionaryIdLookup(
String name,
Path tempBasePath,
@Nullable DictionaryWriter<String> stringDictionaryWriter,
@Nullable DictionaryWriter<Long> longDictionaryWriter,
@Nullable DictionaryWriter<Double> doubleDictionaryWriter,
@ -85,6 +92,7 @@ public final class DictionaryIdLookup implements Closeable
)
{
this.name = name;
this.tempBasePath = tempBasePath;
this.stringDictionaryWriter = stringDictionaryWriter;
this.longDictionaryWriter = longDictionaryWriter;
this.doubleDictionaryWriter = doubleDictionaryWriter;
@ -98,16 +106,20 @@ public final class DictionaryIdLookup implements Closeable
// for strings because of this. if other type dictionary writers could potentially use multiple internal files
// in the future, we should transition them to using this approach as well (or build a combination smoosher and
// mapper so that we can have a mutable smoosh)
File stringSmoosh = FileUtils.createTempDir(StringUtils.urlEncode(name) + "__stringTempSmoosh");
File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath, StringUtils.urlEncode(name) + "__stringTempSmoosh");
stringDictionaryFile = stringSmoosh.toPath();
final String fileName = NestedCommonFormatColumnSerializer.getInternalFileName(
name,
NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
);
final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
try (final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
fileName,
stringDictionaryWriter.getSerializedSize()
)) {
try (
final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
fileName,
stringDictionaryWriter.getSerializedSize()
)
) {
stringDictionaryWriter.writeTo(writer, smoosher);
writer.close();
smoosher.close();
@ -134,8 +146,8 @@ public final class DictionaryIdLookup implements Closeable
public int lookupLong(@Nullable Long value)
{
if (longDictionary == null) {
final Path longFile = makeTempFile(name + NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME);
longBuffer = mapWriter(longFile, longDictionaryWriter);
longDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME);
longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter);
longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES).get();
// reset position
longBuffer.position(0);
@ -150,8 +162,8 @@ public final class DictionaryIdLookup implements Closeable
public int lookupDouble(@Nullable Double value)
{
if (doubleDictionary == null) {
final Path doubleFile = makeTempFile(name + NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME);
doubleBuffer = mapWriter(doubleFile, doubleDictionaryWriter);
doubleDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME);
doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter);
doubleDictionary = FixedIndexed.read(
doubleBuffer,
TypeStrategies.DOUBLE,
@ -171,8 +183,8 @@ public final class DictionaryIdLookup implements Closeable
public int lookupArray(@Nullable int[] value)
{
if (arrayDictionary == null) {
final Path arrayFile = makeTempFile(name + NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME);
arrayBuffer = mapWriter(arrayFile, arrayDictionaryWriter);
arrayDictionaryFile = makeTempFile(name + NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME);
arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter);
arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer, ByteOrder.nativeOrder()).get();
// reset position
arrayBuffer.position(0);
@ -213,15 +225,19 @@ public final class DictionaryIdLookup implements Closeable
{
if (stringBufferMapper != null) {
stringBufferMapper.close();
deleteTempFile(stringDictionaryFile);
}
if (longBuffer != null) {
ByteBufferUtils.unmap(longBuffer);
deleteTempFile(longDictionaryFile);
}
if (doubleBuffer != null) {
ByteBufferUtils.unmap(doubleBuffer);
deleteTempFile(doubleDictionaryFile);
}
if (arrayBuffer != null) {
ByteBufferUtils.unmap(arrayBuffer);
deleteTempFile(arrayDictionaryFile);
}
}
@ -243,7 +259,22 @@ public final class DictionaryIdLookup implements Closeable
private Path makeTempFile(String name)
{
try {
return Files.createTempFile(StringUtils.urlEncode(name), null);
return Files.createTempFile(tempBasePath, StringUtils.urlEncode(name), null);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private void deleteTempFile(Path path)
{
try {
final File file = path.toFile();
if (file.isDirectory()) {
FileUtils.deleteDirectory(file);
} else {
Files.delete(path);
}
}
catch (IOException e) {
throw new RuntimeException(e);

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.Closer;
@ -234,6 +235,7 @@ public class NestedDataColumnSerializer extends NestedCommonFormatColumnSerializ
globalDictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
dictionaryWriter,
longDictionaryWriter,
doubleDictionaryWriter,

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
@ -198,6 +199,7 @@ public class NestedDataColumnSerializerV4 implements GenericColumnSerializer<Str
globalDictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
dictionaryWriter,
longDictionaryWriter,
doubleDictionaryWriter,

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.nested;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
@ -76,6 +77,7 @@ public class ScalarDoubleColumnSerializer extends ScalarNestedCommonFormatColumn
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
null,
null,
dictionaryWriter,

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.nested;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
@ -77,6 +78,7 @@ public class ScalarLongColumnSerializer extends ScalarNestedCommonFormatColumnSe
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
null,
dictionaryWriter,
null,

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.nested;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
@ -71,6 +72,7 @@ public class ScalarStringColumnSerializer extends ScalarNestedCommonFormatColumn
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
dictionaryWriter,
null,
null,

View File

@ -26,6 +26,7 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
@ -154,6 +155,7 @@ public class VariantColumnSerializer extends NestedCommonFormatColumnSerializer
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
FileUtils.getTempDir(),
dictionaryWriter,
longDictionaryWriter,
doubleDictionaryWriter,

View File

@ -26,7 +26,6 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.ByteArrayInputStream;
@ -40,9 +39,6 @@ public class FileUtilsTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testMap() throws IOException
{
@ -137,18 +133,28 @@ public class FileUtilsTest
}
}
@Test
public void testCreateTempDirInLocation() throws IOException
{
final File baseDir = temporaryFolder.newFolder();
File tmp = FileUtils.createTempDirInLocation(baseDir.toPath(), null);
Assert.assertTrue(tmp.getName().startsWith("druid"));
Assert.assertEquals(
baseDir.toPath(),
tmp.getParentFile().toPath()
);
}
@Test
public void testCreateTempDirNonexistentBase()
{
final String oldJavaTmpDir = System.getProperty("java.io.tmpdir");
final String nonExistentDir = oldJavaTmpDir + "/nonexistent";
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(StringUtils.format("java.io.tmpdir (%s) does not exist", nonExistentDir));
final String nonExistentDir = oldJavaTmpDir + "nonexistent";
try {
System.setProperty("java.io.tmpdir", nonExistentDir);
FileUtils.createTempDir();
Throwable e = Assert.assertThrows(IllegalStateException.class, () -> FileUtils.createTempDir());
Assert.assertEquals("Path [" + nonExistentDir + "] does not exist", e.getMessage());
}
finally {
System.setProperty("java.io.tmpdir", oldJavaTmpDir);
@ -159,23 +165,19 @@ public class FileUtilsTest
public void testCreateTempDirUnwritableBase() throws IOException
{
final File baseDir = FileUtils.createTempDir();
final String oldJavaTmpDir = System.getProperty("java.io.tmpdir");
try {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("java.io.tmpdir (" + baseDir + ") is not writable");
final String oldJavaTmpDir = System.getProperty("java.io.tmpdir");
try {
System.setProperty("java.io.tmpdir", baseDir.getPath());
baseDir.setWritable(false);
FileUtils.createTempDir();
}
finally {
System.setProperty("java.io.tmpdir", oldJavaTmpDir);
}
System.setProperty("java.io.tmpdir", baseDir.getPath());
baseDir.setWritable(false);
Throwable e = Assert.assertThrows(IllegalStateException.class, () -> FileUtils.createTempDir());
Assert.assertEquals("Path [" + baseDir + "] is not writable, check permissions", e.getMessage());
}
finally {
baseDir.setWritable(true);
Files.delete(baseDir.toPath());
System.setProperty("java.io.tmpdir", oldJavaTmpDir);
}
}
@ -197,9 +199,11 @@ public class FileUtilsTest
{
final File tmpFile = temporaryFolder.newFile();
expectedException.expect(IOException.class);
expectedException.expectMessage("Cannot create directory");
FileUtils.mkdirp(tmpFile);
Throwable t = Assert.assertThrows(IOException.class, () -> FileUtils.mkdirp(tmpFile));
MatcherAssert.assertThat(
t,
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Cannot create directory"))
);
}
@Test
@ -208,17 +212,13 @@ public class FileUtilsTest
final File tmpDir = temporaryFolder.newFolder();
final File testDirectory = new File(tmpDir, "test");
tmpDir.setWritable(false);
try {
final IOException e = Assert.assertThrows(IOException.class, () -> FileUtils.mkdirp(testDirectory));
final IOException e = Assert.assertThrows(IOException.class, () -> FileUtils.mkdirp(testDirectory));
MatcherAssert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Cannot create directory"))
);
}
finally {
tmpDir.setWritable(true);
}
MatcherAssert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Cannot create directory"))
);
tmpDir.setWritable(true);
// Now it should work.
FileUtils.mkdirp(testDirectory);

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.nested;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.AutoTypeColumnMerger;
import org.apache.druid.segment.column.StringEncodingStrategies;
import org.apache.druid.segment.column.StringEncodingStrategy;
import org.apache.druid.segment.column.TypeStrategies;
import org.apache.druid.segment.data.DictionaryWriter;
import org.apache.druid.segment.data.FixedIndexedWriter;
import org.apache.druid.segment.data.FrontCodedIntArrayIndexedWriter;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.file.Path;
public class DictionaryIdLookupTest extends InitializedNullHandlingTest
{
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
@Test
public void testIdLookup() throws IOException
{
// add some values
ValueDictionary dictionary = new ValueDictionary();
dictionary.addStringValue("hello");
dictionary.addStringValue("world");
dictionary.addStringValue(null);
dictionary.addLongValue(123L);
dictionary.addLongValue(-123L);
dictionary.addDoubleValue(1.234);
dictionary.addDoubleValue(0.001);
dictionary.addStringArray(new Object[]{"hello", "world"});
dictionary.addLongArray(new Object[]{1L, 2L, 3L});
dictionary.addDoubleArray(new Object[]{0.01, -1.234, 0.001, 1.234});
// sort them
SortedValueDictionary sortedValueDictionary = dictionary.getSortedCollector();
// setup dictionary writers
SegmentWriteOutMedium medium = TmpFileSegmentWriteOutMediumFactory.instance()
.makeSegmentWriteOutMedium(temp.newFolder());
DictionaryWriter<String> stringWriter = StringEncodingStrategies.getStringDictionaryWriter(
new StringEncodingStrategy.FrontCoded(4, (byte) 1),
medium,
"test"
);
FixedIndexedWriter<Long> longWriter = new FixedIndexedWriter<>(
medium,
TypeStrategies.LONG,
ByteOrder.nativeOrder(),
Long.BYTES,
true
);
FixedIndexedWriter<Double> doubleWriter = new FixedIndexedWriter<>(
medium,
TypeStrategies.DOUBLE,
ByteOrder.nativeOrder(),
Double.BYTES,
true
);
FrontCodedIntArrayIndexedWriter arrayWriter = new FrontCodedIntArrayIndexedWriter(
medium,
ByteOrder.nativeOrder(),
4
);
Path dictTempPath = temp.newFolder().toPath();
// make lookup with references to writers
DictionaryIdLookup idLookup = new DictionaryIdLookup(
"test",
dictTempPath,
stringWriter,
longWriter,
doubleWriter,
arrayWriter
);
// write the stuff
stringWriter.open();
longWriter.open();
doubleWriter.open();
arrayWriter.open();
File tempDir = dictTempPath.toFile();
Assert.assertEquals(0, tempDir.listFiles().length);
for (String s : sortedValueDictionary.getSortedStrings()) {
stringWriter.write(s);
}
for (Long l : sortedValueDictionary.getSortedLongs()) {
longWriter.write(l);
}
for (Double d : sortedValueDictionary.getSortedDoubles()) {
doubleWriter.write(d);
}
Iterable<int[]> sortedArrays = () -> new AutoTypeColumnMerger.ArrayDictionaryMergingIterator(
new Iterable[]{sortedValueDictionary.getSortedArrays()},
idLookup
);
Assert.assertEquals(0, tempDir.listFiles().length);
// looking up some values pulls in string dictionary and long dictionary
Assert.assertEquals(0, idLookup.lookupString(null));
Assert.assertEquals(1, idLookup.lookupString("hello"));
Assert.assertEquals(2, idLookup.lookupString("world"));
Assert.assertEquals(3, idLookup.lookupLong(-123L));
Assert.assertEquals(2, tempDir.listFiles().length);
// writing arrays needs to use the lookups for lower value dictionaries, so will create string, long, and double
// temp dictionary files
for (int[] arr : sortedArrays) {
arrayWriter.write(arr);
}
Assert.assertEquals(3, tempDir.listFiles().length);
if (NullHandling.sqlCompatible()) {
Assert.assertEquals(8, idLookup.lookupDouble(-1.234));
Assert.assertEquals(11, idLookup.lookupDouble(1.234));
Assert.assertEquals(3, tempDir.listFiles().length);
// looking up arrays pulls in array file
Assert.assertEquals(12, idLookup.lookupArray(new int[]{1, 2}));
Assert.assertEquals(13, idLookup.lookupArray(new int[]{4, 5, 6}));
Assert.assertEquals(14, idLookup.lookupArray(new int[]{10, 8, 9, 11}));
Assert.assertEquals(4, tempDir.listFiles().length);
} else {
// default value mode sticks zeros in dictionary even if not present in column because of .. reasons
Assert.assertEquals(9, idLookup.lookupDouble(-1.234));
Assert.assertEquals(13, idLookup.lookupDouble(1.234));
Assert.assertEquals(3, tempDir.listFiles().length);
// looking up arrays pulls in array file
Assert.assertEquals(14, idLookup.lookupArray(new int[]{1, 2}));
Assert.assertEquals(15, idLookup.lookupArray(new int[]{5, 6, 7}));
Assert.assertEquals(16, idLookup.lookupArray(new int[]{12, 9, 11, 13}));
Assert.assertEquals(4, tempDir.listFiles().length);
}
// close it removes all the temp files
idLookup.close();
Assert.assertEquals(0, tempDir.listFiles().length);
}
}