diff --git a/core/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java b/core/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java index 6d4de9378e7..17eb469e949 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java +++ b/core/src/main/java/org/apache/druid/java/util/common/io/smoosh/FileSmoosher.java @@ -21,6 +21,7 @@ package org.apache.druid.java.util.common.io.smoosh; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.primitives.Ints; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IAE; @@ -46,9 +47,11 @@ import java.nio.charset.StandardCharsets; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; /** * A class that concatenates files together into configurable sized chunks, @@ -82,6 +85,10 @@ public class FileSmoosher implements Closeable private List completedFiles = new ArrayList<>(); // list of files in process writing content using delegated smooshedWriter. private List filesInProcess = new ArrayList<>(); + // delegated smooshedWriter creates a new temporary file per file added. we use a counter to name these temporary + // files, and map the file number to the file name so we don't have to escape the file names (e.g. names with '/') + private AtomicLong delegateFileCounter = new AtomicLong(0); + private Map delegateFileNameMap; private Outer currOut = null; private boolean writerCurrentlyInUse = false; @@ -100,6 +107,7 @@ public class FileSmoosher implements Closeable { this.baseDir = baseDir; this.maxChunkSize = maxChunkSize; + this.delegateFileNameMap = new HashMap<>(); Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a positive value."); } @@ -223,18 +231,20 @@ public class FileSmoosher implements Closeable @Override public void close() throws IOException { - open = false; - internalFiles.put(name, new Metadata(currOut.getFileNum(), startOffset, currOut.getCurrOffset())); - writerCurrentlyInUse = false; + if (open) { + open = false; + internalFiles.put(name, new Metadata(currOut.getFileNum(), startOffset, currOut.getCurrOffset())); + writerCurrentlyInUse = false; - if (bytesWritten != currOut.getCurrOffset() - startOffset) { - throw new ISE("Perhaps there is some concurrent modification going on?"); + if (bytesWritten != currOut.getCurrOffset() - startOffset) { + throw new ISE("Perhaps there is some concurrent modification going on?"); + } + if (bytesWritten != size) { + throw new IOE("Expected [%,d] bytes, only saw [%,d], potential corruption?", size, bytesWritten); + } + // Merge temporary files on to the main smoosh file. + mergeWithSmoosher(); } - if (bytesWritten != size) { - throw new IOE("Expected [%,d] bytes, only saw [%,d], potential corruption?", size, bytesWritten); - } - // Merge temporary files on to the main smoosh file. - mergeWithSmoosher(); } }; } @@ -249,9 +259,11 @@ public class FileSmoosher implements Closeable { // Get processed elements from the stack and write. List fileToProcess = new ArrayList<>(completedFiles); + Map fileNameMap = ImmutableMap.copyOf(delegateFileNameMap); completedFiles = new ArrayList<>(); + delegateFileNameMap = new HashMap<>(); for (File file : fileToProcess) { - add(file); + add(fileNameMap.get(file.getName()), file); if (!file.delete()) { LOG.warn("Unable to delete file [%s]", file); } @@ -272,7 +284,8 @@ public class FileSmoosher implements Closeable */ private SmooshedWriter delegateSmooshedWriter(final String name, final long size) throws IOException { - final File tmpFile = new File(baseDir, name); + final String delegateName = getDelegateFileName(name); + final File tmpFile = new File(baseDir, delegateName); filesInProcess.add(tmpFile); return new SmooshedWriter() @@ -342,6 +355,13 @@ public class FileSmoosher implements Closeable } + private String getDelegateFileName(String name) + { + final String delegateName = String.valueOf(delegateFileCounter.getAndIncrement()); + delegateFileNameMap.put(delegateName, name); + return delegateName; + } + @Override public void close() throws IOException { diff --git a/core/src/test/java/org/apache/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java b/core/src/test/java/org/apache/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java index b0cd5f6b61d..2be34fe85fe 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/io/smoosh/SmooshedFileMapperTest.java @@ -112,6 +112,27 @@ public class SmooshedFileMapperTest validateOutput(baseDir); } + @Test + public void testWhenWithPathyLookingFileNames() throws Exception + { + String prefix = "foo/bar/"; + File baseDir = folder.newFolder("base"); + + try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) { + final SmooshedWriter writer = smoosher.addWithSmooshedWriter(StringUtils.format("%s%d", prefix, 19), 4); + writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); + + for (int i = 0; i < 19; ++i) { + File tmpFile = File.createTempFile(StringUtils.format("smoosh-%s", i), ".bin"); + Files.write(Ints.toByteArray(i), tmpFile); + smoosher.add(StringUtils.format("%s%d", prefix, i), tmpFile); + tmpFile.delete(); + } + writer.close(); + } + validateOutput(baseDir, prefix); + } + @Test public void testBehaviorWhenReportedSizesLargeAndExceptionIgnored() throws Exception { @@ -193,6 +214,11 @@ public class SmooshedFileMapperTest } private void validateOutput(File baseDir) throws IOException + { + validateOutput(baseDir, ""); + } + + private void validateOutput(File baseDir, String prefix) throws IOException { File[] files = baseDir.listFiles(); Arrays.sort(files); @@ -205,7 +231,7 @@ public class SmooshedFileMapperTest try (SmooshedFileMapper mapper = SmooshedFileMapper.load(baseDir)) { for (int i = 0; i < 20; ++i) { - ByteBuffer buf = mapper.mapFile(StringUtils.format("%d", i)); + ByteBuffer buf = mapper.mapFile(StringUtils.format("%s%d", prefix, i)); Assert.assertEquals(0, buf.position()); Assert.assertEquals(4, buf.remaining()); Assert.assertEquals(4, buf.capacity()); diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/ByteBufferWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/ByteBufferWriteOutBytes.java index 59c8733b226..ec903e8f1a6 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/ByteBufferWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/ByteBufferWriteOutBytes.java @@ -47,6 +47,7 @@ public abstract class ByteBufferWriteOutBytes extends WriteOutBytes ByteBuffer headBuffer; long size; long capacity; + boolean open = true; ByteBufferWriteOutBytes() { @@ -253,7 +254,17 @@ public abstract class ByteBufferWriteOutBytes extends WriteOutBytes @Override public boolean isOpen() { - return true; + return open; + } + + public void free() + { + open = false; + buffers.clear(); + headBufferIndex = -1; + headBuffer = null; + size = 0; + capacity = 0; } private void checkOpen() diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/DirectByteBufferWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/DirectByteBufferWriteOutBytes.java index 41f0b6f0bd9..1a172e3d09e 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/DirectByteBufferWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/DirectByteBufferWriteOutBytes.java @@ -25,8 +25,6 @@ import java.nio.ByteBuffer; final class DirectByteBufferWriteOutBytes extends ByteBufferWriteOutBytes { - private boolean open = true; - @Override protected ByteBuffer allocateBuffer() { @@ -34,12 +32,7 @@ final class DirectByteBufferWriteOutBytes extends ByteBufferWriteOutBytes } @Override - public boolean isOpen() - { - return open; - } - - void free() + public void free() { open = false; buffers.forEach(ByteBufferUtils::free); diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/OffHeapMemorySegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/OffHeapMemorySegmentWriteOutMedium.java index b9edb93bb4e..2340fe0caa8 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/OffHeapMemorySegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/OffHeapMemorySegmentWriteOutMedium.java @@ -35,6 +35,14 @@ public final class OffHeapMemorySegmentWriteOutMedium implements SegmentWriteOut return writeOutBytes; } + @Override + public SegmentWriteOutMedium makeChildWriteOutMedium() + { + OffHeapMemorySegmentWriteOutMedium medium = new OffHeapMemorySegmentWriteOutMedium(); + closer.register(medium); + return medium; + } + @Override public Closer getCloser() { diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/OnHeapMemorySegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/OnHeapMemorySegmentWriteOutMedium.java index 5e154c474a4..cef0a34d2cd 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/OnHeapMemorySegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/OnHeapMemorySegmentWriteOutMedium.java @@ -32,7 +32,17 @@ public final class OnHeapMemorySegmentWriteOutMedium implements SegmentWriteOutM @Override public WriteOutBytes makeWriteOutBytes() { - return new HeapByteBufferWriteOutBytes(); + HeapByteBufferWriteOutBytes writeOutBytes = new HeapByteBufferWriteOutBytes(); + closer.register(writeOutBytes::free); + return writeOutBytes; + } + + @Override + public SegmentWriteOutMedium makeChildWriteOutMedium() + { + OnHeapMemorySegmentWriteOutMedium medium = new OnHeapMemorySegmentWriteOutMedium(); + closer.register(medium); + return medium; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java index 1fb5371edac..7c2af89b1b6 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/SegmentWriteOutMedium.java @@ -39,6 +39,18 @@ public interface SegmentWriteOutMedium extends Closeable */ WriteOutBytes makeWriteOutBytes() throws IOException; + /** + * Creates a 'child' version of the {@link SegmentWriteOutMedium}, which can be optionally closed, + * independent of this {@link SegmentWriteOutMedium} but otherwise shares the same configuration. This allows callers + * using a shared {@link SegmentWriteOutMedium} but which control the complete lifecycle of the {@link WriteOutBytes} + * which they require to free the backing resources when they are finished, rather than waiting until + * {@link #close()} is called for this medium. + * + * The 'child' medium will be closed when {@link #close()} is called, if not called explicitly prior to closing this + * medium. + */ + SegmentWriteOutMedium makeChildWriteOutMedium() throws IOException; + /** * Returns a closer of this SegmentWriteOutMedium, which is closed in this SegmentWriteOutMedium's close() method. * Could be used to "attach" some random resources to this SegmentWriteOutMedium, to be closed at the same time. diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java index b075a67e04b..007b93aa04b 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/TmpFileSegmentWriteOutMedium.java @@ -54,6 +54,14 @@ public final class TmpFileSegmentWriteOutMedium implements SegmentWriteOutMedium return new FileWriteOutBytes(file, ch); } + @Override + public SegmentWriteOutMedium makeChildWriteOutMedium() throws IOException + { + TmpFileSegmentWriteOutMedium tmpFileSegmentWriteOutMedium = new TmpFileSegmentWriteOutMedium(dir); + closer.register(tmpFileSegmentWriteOutMedium); + return tmpFileSegmentWriteOutMedium; + } + @Override public Closer getCloser() { diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/SegmentWriteOutMediumTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/SegmentWriteOutMediumTest.java new file mode 100644 index 00000000000..c6f188bc468 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/writeout/SegmentWriteOutMediumTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.writeout; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.io.Closer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; + +@RunWith(Parameterized.class) +public class SegmentWriteOutMediumTest +{ + + @Parameterized.Parameters(name = "medium = {0}") + public static Iterable constructorFeeder() + { + return ImmutableList.of( + TmpFileSegmentWriteOutMediumFactory.instance(), + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + OnHeapMemorySegmentWriteOutMediumFactory.instance() + ); + } + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private SegmentWriteOutMediumFactory factory; + private SegmentWriteOutMedium medium; + + public SegmentWriteOutMediumTest(SegmentWriteOutMediumFactory factory) + { + this.factory = factory; + } + + @Before + public void setup() throws IOException + { + this.medium = factory.makeSegmentWriteOutMedium(temporaryFolder.newFolder()); + } + + @Test + public void testSanity() throws IOException + { + WriteOutBytes bytes1 = medium.makeWriteOutBytes(); + WriteOutBytes bytes2 = medium.makeWriteOutBytes(); + + Assert.assertTrue(bytes1.isOpen()); + Assert.assertTrue(bytes2.isOpen()); + + Closer closer = medium.getCloser(); + closer.close(); + + Assert.assertFalse(bytes1.isOpen()); + Assert.assertFalse(bytes2.isOpen()); + } + + @Test + public void testChildCloseFreesResourcesButNotParents() throws IOException + { + WriteOutBytes bytes1 = medium.makeWriteOutBytes(); + WriteOutBytes bytes2 = medium.makeWriteOutBytes(); + + Assert.assertTrue(bytes1.isOpen()); + Assert.assertTrue(bytes2.isOpen()); + + SegmentWriteOutMedium childMedium = medium.makeChildWriteOutMedium(); + Assert.assertTrue(childMedium.getClass().equals(medium.getClass())); + + WriteOutBytes bytes3 = childMedium.makeWriteOutBytes(); + WriteOutBytes bytes4 = childMedium.makeWriteOutBytes(); + + Assert.assertTrue(bytes3.isOpen()); + Assert.assertTrue(bytes4.isOpen()); + + Closer childCloser = childMedium.getCloser(); + childCloser.close(); + + Assert.assertFalse(bytes3.isOpen()); + Assert.assertFalse(bytes4.isOpen()); + + Assert.assertTrue(bytes1.isOpen()); + Assert.assertTrue(bytes2.isOpen()); + + Closer closer = medium.getCloser(); + closer.close(); + + Assert.assertFalse(bytes1.isOpen()); + Assert.assertFalse(bytes2.isOpen()); + } + + @Test + public void testChildNotClosedExplicitlyIsClosedByParent() throws IOException + { + WriteOutBytes bytes1 = medium.makeWriteOutBytes(); + WriteOutBytes bytes2 = medium.makeWriteOutBytes(); + + Assert.assertTrue(bytes1.isOpen()); + Assert.assertTrue(bytes2.isOpen()); + + SegmentWriteOutMedium childMedium = medium.makeChildWriteOutMedium(); + Assert.assertTrue(childMedium.getClass().equals(medium.getClass())); + + WriteOutBytes bytes3 = childMedium.makeWriteOutBytes(); + WriteOutBytes bytes4 = childMedium.makeWriteOutBytes(); + + Assert.assertTrue(bytes3.isOpen()); + Assert.assertTrue(bytes4.isOpen()); + + Closer closer = medium.getCloser(); + closer.close(); + + Assert.assertFalse(bytes1.isOpen()); + Assert.assertFalse(bytes2.isOpen()); + + Assert.assertFalse(bytes3.isOpen()); + Assert.assertFalse(bytes4.isOpen()); + } +}