fix delegated smoosh writer and some new facilities for segment writeout medium (#12132)

* fix delegated smoosh writer and some new facilities for segment writeout medium
changes:
* fixed issue with delegated `SmooshedWriter` when writing files that look like paths, causing `NoSuchFileException` exceptions when attempting to open a channel to the file
* `FileSmoosher.addWithSmooshedWriter` when _not_ delegating now checks that it is still open when closing, making it a no-op if already closed (allowing column serializers to add additional files and avoid delegated mode if they are finished writing out their own content and ned to add additional files)
* add `makeChildWriteOutMedium` to `SegmentWriteOutMedium` interface, which allows users of a shared medium to clean up `WriteOutBytes` if they fully control the lifecycle. there are no callers of this yet, adding for future functionality
* `OnHeapByteBufferWriteOutBytes` now can be marked as not open so it `OnHeapMemorySegmentWriteOutMedium` can now behave identically to other medium implementations

* fix to address nit - use AtomicLong
This commit is contained in:
Clint Wylie 2022-01-10 22:25:19 -08:00 committed by GitHub
parent c8ddf60851
commit 7cf9192765
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 253 additions and 23 deletions

View File

@ -21,6 +21,7 @@ package org.apache.druid.java.util.common.io.smoosh;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
@ -46,9 +47,11 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* A class that concatenates files together into configurable sized chunks,
@ -82,6 +85,10 @@ public class FileSmoosher implements Closeable
private List<File> completedFiles = new ArrayList<>();
// list of files in process writing content using delegated smooshedWriter.
private List<File> filesInProcess = new ArrayList<>();
// delegated smooshedWriter creates a new temporary file per file added. we use a counter to name these temporary
// files, and map the file number to the file name so we don't have to escape the file names (e.g. names with '/')
private AtomicLong delegateFileCounter = new AtomicLong(0);
private Map<String, String> delegateFileNameMap;
private Outer currOut = null;
private boolean writerCurrentlyInUse = false;
@ -100,6 +107,7 @@ public class FileSmoosher implements Closeable
{
this.baseDir = baseDir;
this.maxChunkSize = maxChunkSize;
this.delegateFileNameMap = new HashMap<>();
Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a positive value.");
}
@ -223,18 +231,20 @@ public class FileSmoosher implements Closeable
@Override
public void close() throws IOException
{
open = false;
internalFiles.put(name, new Metadata(currOut.getFileNum(), startOffset, currOut.getCurrOffset()));
writerCurrentlyInUse = false;
if (open) {
open = false;
internalFiles.put(name, new Metadata(currOut.getFileNum(), startOffset, currOut.getCurrOffset()));
writerCurrentlyInUse = false;
if (bytesWritten != currOut.getCurrOffset() - startOffset) {
throw new ISE("Perhaps there is some concurrent modification going on?");
if (bytesWritten != currOut.getCurrOffset() - startOffset) {
throw new ISE("Perhaps there is some concurrent modification going on?");
}
if (bytesWritten != size) {
throw new IOE("Expected [%,d] bytes, only saw [%,d], potential corruption?", size, bytesWritten);
}
// Merge temporary files on to the main smoosh file.
mergeWithSmoosher();
}
if (bytesWritten != size) {
throw new IOE("Expected [%,d] bytes, only saw [%,d], potential corruption?", size, bytesWritten);
}
// Merge temporary files on to the main smoosh file.
mergeWithSmoosher();
}
};
}
@ -249,9 +259,11 @@ public class FileSmoosher implements Closeable
{
// Get processed elements from the stack and write.
List<File> fileToProcess = new ArrayList<>(completedFiles);
Map<String, String> fileNameMap = ImmutableMap.copyOf(delegateFileNameMap);
completedFiles = new ArrayList<>();
delegateFileNameMap = new HashMap<>();
for (File file : fileToProcess) {
add(file);
add(fileNameMap.get(file.getName()), file);
if (!file.delete()) {
LOG.warn("Unable to delete file [%s]", file);
}
@ -272,7 +284,8 @@ public class FileSmoosher implements Closeable
*/
private SmooshedWriter delegateSmooshedWriter(final String name, final long size) throws IOException
{
final File tmpFile = new File(baseDir, name);
final String delegateName = getDelegateFileName(name);
final File tmpFile = new File(baseDir, delegateName);
filesInProcess.add(tmpFile);
return new SmooshedWriter()
@ -342,6 +355,13 @@ public class FileSmoosher implements Closeable
}
private String getDelegateFileName(String name)
{
final String delegateName = String.valueOf(delegateFileCounter.getAndIncrement());
delegateFileNameMap.put(delegateName, name);
return delegateName;
}
@Override
public void close() throws IOException
{

View File

@ -112,6 +112,27 @@ public class SmooshedFileMapperTest
validateOutput(baseDir);
}
@Test
public void testWhenWithPathyLookingFileNames() throws Exception
{
String prefix = "foo/bar/";
File baseDir = folder.newFolder("base");
try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) {
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(StringUtils.format("%s%d", prefix, 19), 4);
writer.write(ByteBuffer.wrap(Ints.toByteArray(19)));
for (int i = 0; i < 19; ++i) {
File tmpFile = File.createTempFile(StringUtils.format("smoosh-%s", i), ".bin");
Files.write(Ints.toByteArray(i), tmpFile);
smoosher.add(StringUtils.format("%s%d", prefix, i), tmpFile);
tmpFile.delete();
}
writer.close();
}
validateOutput(baseDir, prefix);
}
@Test
public void testBehaviorWhenReportedSizesLargeAndExceptionIgnored() throws Exception
{
@ -193,6 +214,11 @@ public class SmooshedFileMapperTest
}
private void validateOutput(File baseDir) throws IOException
{
validateOutput(baseDir, "");
}
private void validateOutput(File baseDir, String prefix) throws IOException
{
File[] files = baseDir.listFiles();
Arrays.sort(files);
@ -205,7 +231,7 @@ public class SmooshedFileMapperTest
try (SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir)) {
for (int i = 0; i < 20; ++i) {
ByteBuffer buf = mapper.mapFile(StringUtils.format("%d", i));
ByteBuffer buf = mapper.mapFile(StringUtils.format("%s%d", prefix, i));
Assert.assertEquals(0, buf.position());
Assert.assertEquals(4, buf.remaining());
Assert.assertEquals(4, buf.capacity());

View File

@ -47,6 +47,7 @@ public abstract class ByteBufferWriteOutBytes extends WriteOutBytes
ByteBuffer headBuffer;
long size;
long capacity;
boolean open = true;
ByteBufferWriteOutBytes()
{
@ -253,7 +254,17 @@ public abstract class ByteBufferWriteOutBytes extends WriteOutBytes
@Override
public boolean isOpen()
{
return true;
return open;
}
public void free()
{
open = false;
buffers.clear();
headBufferIndex = -1;
headBuffer = null;
size = 0;
capacity = 0;
}
private void checkOpen()

View File

@ -25,8 +25,6 @@ import java.nio.ByteBuffer;
final class DirectByteBufferWriteOutBytes extends ByteBufferWriteOutBytes
{
private boolean open = true;
@Override
protected ByteBuffer allocateBuffer()
{
@ -34,12 +32,7 @@ final class DirectByteBufferWriteOutBytes extends ByteBufferWriteOutBytes
}
@Override
public boolean isOpen()
{
return open;
}
void free()
public void free()
{
open = false;
buffers.forEach(ByteBufferUtils::free);

View File

@ -35,6 +35,14 @@ public final class OffHeapMemorySegmentWriteOutMedium implements SegmentWriteOut
return writeOutBytes;
}
@Override
public SegmentWriteOutMedium makeChildWriteOutMedium()
{
OffHeapMemorySegmentWriteOutMedium medium = new OffHeapMemorySegmentWriteOutMedium();
closer.register(medium);
return medium;
}
@Override
public Closer getCloser()
{

View File

@ -32,7 +32,17 @@ public final class OnHeapMemorySegmentWriteOutMedium implements SegmentWriteOutM
@Override
public WriteOutBytes makeWriteOutBytes()
{
return new HeapByteBufferWriteOutBytes();
HeapByteBufferWriteOutBytes writeOutBytes = new HeapByteBufferWriteOutBytes();
closer.register(writeOutBytes::free);
return writeOutBytes;
}
@Override
public SegmentWriteOutMedium makeChildWriteOutMedium()
{
OnHeapMemorySegmentWriteOutMedium medium = new OnHeapMemorySegmentWriteOutMedium();
closer.register(medium);
return medium;
}
@Override

View File

@ -39,6 +39,18 @@ public interface SegmentWriteOutMedium extends Closeable
*/
WriteOutBytes makeWriteOutBytes() throws IOException;
/**
* Creates a 'child' version of the {@link SegmentWriteOutMedium}, which can be optionally closed,
* independent of this {@link SegmentWriteOutMedium} but otherwise shares the same configuration. This allows callers
* using a shared {@link SegmentWriteOutMedium} but which control the complete lifecycle of the {@link WriteOutBytes}
* which they require to free the backing resources when they are finished, rather than waiting until
* {@link #close()} is called for this medium.
*
* The 'child' medium will be closed when {@link #close()} is called, if not called explicitly prior to closing this
* medium.
*/
SegmentWriteOutMedium makeChildWriteOutMedium() throws IOException;
/**
* Returns a closer of this SegmentWriteOutMedium, which is closed in this SegmentWriteOutMedium's close() method.
* Could be used to "attach" some random resources to this SegmentWriteOutMedium, to be closed at the same time.

View File

@ -54,6 +54,14 @@ public final class TmpFileSegmentWriteOutMedium implements SegmentWriteOutMedium
return new FileWriteOutBytes(file, ch);
}
@Override
public SegmentWriteOutMedium makeChildWriteOutMedium() throws IOException
{
TmpFileSegmentWriteOutMedium tmpFileSegmentWriteOutMedium = new TmpFileSegmentWriteOutMedium(dir);
closer.register(tmpFileSegmentWriteOutMedium);
return tmpFileSegmentWriteOutMedium;
}
@Override
public Closer getCloser()
{

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.writeout;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.io.Closer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
@RunWith(Parameterized.class)
public class SegmentWriteOutMediumTest
{
@Parameterized.Parameters(name = "medium = {0}")
public static Iterable<?> constructorFeeder()
{
return ImmutableList.of(
TmpFileSegmentWriteOutMediumFactory.instance(),
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
OnHeapMemorySegmentWriteOutMediumFactory.instance()
);
}
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private SegmentWriteOutMediumFactory factory;
private SegmentWriteOutMedium medium;
public SegmentWriteOutMediumTest(SegmentWriteOutMediumFactory factory)
{
this.factory = factory;
}
@Before
public void setup() throws IOException
{
this.medium = factory.makeSegmentWriteOutMedium(temporaryFolder.newFolder());
}
@Test
public void testSanity() throws IOException
{
WriteOutBytes bytes1 = medium.makeWriteOutBytes();
WriteOutBytes bytes2 = medium.makeWriteOutBytes();
Assert.assertTrue(bytes1.isOpen());
Assert.assertTrue(bytes2.isOpen());
Closer closer = medium.getCloser();
closer.close();
Assert.assertFalse(bytes1.isOpen());
Assert.assertFalse(bytes2.isOpen());
}
@Test
public void testChildCloseFreesResourcesButNotParents() throws IOException
{
WriteOutBytes bytes1 = medium.makeWriteOutBytes();
WriteOutBytes bytes2 = medium.makeWriteOutBytes();
Assert.assertTrue(bytes1.isOpen());
Assert.assertTrue(bytes2.isOpen());
SegmentWriteOutMedium childMedium = medium.makeChildWriteOutMedium();
Assert.assertTrue(childMedium.getClass().equals(medium.getClass()));
WriteOutBytes bytes3 = childMedium.makeWriteOutBytes();
WriteOutBytes bytes4 = childMedium.makeWriteOutBytes();
Assert.assertTrue(bytes3.isOpen());
Assert.assertTrue(bytes4.isOpen());
Closer childCloser = childMedium.getCloser();
childCloser.close();
Assert.assertFalse(bytes3.isOpen());
Assert.assertFalse(bytes4.isOpen());
Assert.assertTrue(bytes1.isOpen());
Assert.assertTrue(bytes2.isOpen());
Closer closer = medium.getCloser();
closer.close();
Assert.assertFalse(bytes1.isOpen());
Assert.assertFalse(bytes2.isOpen());
}
@Test
public void testChildNotClosedExplicitlyIsClosedByParent() throws IOException
{
WriteOutBytes bytes1 = medium.makeWriteOutBytes();
WriteOutBytes bytes2 = medium.makeWriteOutBytes();
Assert.assertTrue(bytes1.isOpen());
Assert.assertTrue(bytes2.isOpen());
SegmentWriteOutMedium childMedium = medium.makeChildWriteOutMedium();
Assert.assertTrue(childMedium.getClass().equals(medium.getClass()));
WriteOutBytes bytes3 = childMedium.makeWriteOutBytes();
WriteOutBytes bytes4 = childMedium.makeWriteOutBytes();
Assert.assertTrue(bytes3.isOpen());
Assert.assertTrue(bytes4.isOpen());
Closer closer = medium.getCloser();
closer.close();
Assert.assertFalse(bytes1.isOpen());
Assert.assertFalse(bytes2.isOpen());
Assert.assertFalse(bytes3.isOpen());
Assert.assertFalse(bytes4.isOpen());
}
}