diff --git a/core/src/main/java/org/apache/druid/java/util/common/ByteBufferUtils.java b/core/src/main/java/org/apache/druid/java/util/common/ByteBufferUtils.java
index 388112378e4..8209049b844 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/ByteBufferUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/ByteBufferUtils.java
@@ -20,7 +20,6 @@
package org.apache.druid.java.util.common;
import org.apache.druid.collections.ResourceHolder;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
@@ -39,8 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class ByteBufferUtils
{
- private static final Logger log = new Logger(ByteBufferUtils.class);
-
// the following MethodHandle lookup code is adapted from Apache Kafka
// https://github.com/apache/kafka/blob/e554dc518eaaa0747899e708160275f95c4e525f/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java
diff --git a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
index 40509501e97..2ca3944d844 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
@@ -35,6 +35,7 @@ import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
@@ -164,13 +165,70 @@ public class FileUtils
*
* @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file}
*
- * @throws FileNotFoundException if the {@code file} does not exist
- * @throws IOException if an I/O error occurs
+ * @throws FileNotFoundException if the {@code file} does not exist
+ * @throws IOException if an I/O error occurs
+ * @throws IllegalArgumentException if length is greater than {@link Integer#MAX_VALUE}
* @see FileChannel#map(FileChannel.MapMode, long, long)
*/
public static MappedByteBufferHandler map(File file) throws IOException
{
- MappedByteBuffer mappedByteBuffer = com.google.common.io.Files.map(file);
+ return map(file, 0, file.length());
+ }
+
+ /**
+ * Fully maps a file read-only in to memory as per
+ * {@link FileChannel#map(FileChannel.MapMode, long, long)}.
+ *
+ * @param file the file to map
+ * @param offset starting offset for the mmap
+ * @param length length for the mmap
+ *
+ * @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file}
+ *
+ * @throws FileNotFoundException if the {@code file} does not exist
+ * @throws IOException if an I/O error occurs
+ * @throws IllegalArgumentException if length is greater than {@link Integer#MAX_VALUE}
+ * @see FileChannel#map(FileChannel.MapMode, long, long)
+ */
+ public static MappedByteBufferHandler map(File file, long offset, long length) throws IOException
+ {
+ if (length > Integer.MAX_VALUE) {
+ throw new IAE("Cannot map region larger than %,d bytes", Integer.MAX_VALUE);
+ }
+
+ try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
+ final FileChannel channel = randomAccessFile.getChannel()) {
+ final MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
+ return new MappedByteBufferHandler(mappedByteBuffer);
+ }
+ }
+
+ /**
+ * Fully maps a file read-only in to memory as per
+ * {@link FileChannel#map(FileChannel.MapMode, long, long)}.
+ *
+ * @param randomAccessFile the file to map. The file will not be closed.
+ * @param offset starting offset for the mmap
+ * @param length length for the mmap
+ *
+ * @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code randomAccessFile}
+ *
+ * @throws IOException if an I/O error occurs
+ * @throws IllegalArgumentException if length is greater than {@link Integer#MAX_VALUE}
+ * @see FileChannel#map(FileChannel.MapMode, long, long)
+ */
+ public static MappedByteBufferHandler map(
+ RandomAccessFile randomAccessFile,
+ long offset,
+ long length
+ ) throws IOException
+ {
+ if (length > Integer.MAX_VALUE) {
+ throw new IAE("Cannot map region larger than %,d bytes", Integer.MAX_VALUE);
+ }
+
+ final FileChannel channel = randomAccessFile.getChannel();
+ final MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
return new MappedByteBufferHandler(mappedByteBuffer);
}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/MappedByteBufferHandler.java b/core/src/main/java/org/apache/druid/java/util/common/MappedByteBufferHandler.java
index 0b7be00b1e8..f75bb1bb721 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/MappedByteBufferHandler.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/MappedByteBufferHandler.java
@@ -19,16 +19,16 @@
package org.apache.druid.java.util.common;
+import org.apache.druid.collections.ResourceHolder;
+
import java.nio.MappedByteBuffer;
/**
- * Facilitates using try-with-resources with {@link MappedByteBuffer}s which don't implement {@link AutoCloseable}.
- *
- *
This interface is a specialization of {@code org.apache.druid.collections.ResourceHandler}.
+ * Facilitates using try-with-resources with {@link MappedByteBuffer}.
*
* @see FileUtils#map
*/
-public final class MappedByteBufferHandler implements AutoCloseable
+public final class MappedByteBufferHandler implements ResourceHolder
{
private final MappedByteBuffer mappedByteBuffer;
@@ -40,6 +40,7 @@ public final class MappedByteBufferHandler implements AutoCloseable
/**
* Returns the wrapped buffer.
*/
+ @Override
public MappedByteBuffer get()
{
return mappedByteBuffer;
diff --git a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
index 2f5feda7f95..f76f2e53d20 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
@@ -59,6 +59,38 @@ public class FileUtilsTest
Assert.assertEquals(buffersMemoryBefore, buffersMemoryAfter);
}
+ @Test
+ public void testMapFileTooLarge() throws IOException
+ {
+ File dataFile = temporaryFolder.newFile("data");
+ try (RandomAccessFile raf = new RandomAccessFile(dataFile, "rw")) {
+ raf.write(42);
+ raf.setLength(1 << 20); // 1 MiB
+ }
+ final IllegalArgumentException e = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> FileUtils.map(dataFile, 0, (long) Integer.MAX_VALUE + 1)
+ );
+ MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("Cannot map region larger than"));
+ }
+
+ @Test
+ public void testMapRandomAccessFileTooLarge() throws IOException
+ {
+ File dataFile = temporaryFolder.newFile("data");
+ try (RandomAccessFile raf = new RandomAccessFile(dataFile, "rw")) {
+ raf.write(42);
+ raf.setLength(1 << 20); // 1 MiB
+ }
+ try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r")) {
+ final IllegalArgumentException e = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> FileUtils.map(raf, 0, (long) Integer.MAX_VALUE + 1)
+ );
+ MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("Cannot map region larger than"));
+ }
+ }
+
@Test
public void testWriteAtomically() throws IOException
{
diff --git a/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java b/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java
index d739ad601e8..628b7c04167 100644
--- a/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java
+++ b/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java
@@ -19,20 +19,15 @@
package org.apache.druid.frame.allocation;
-import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.collections.ResourceHolder;
-import org.apache.druid.io.Channels;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -239,28 +234,6 @@ public class AppendableMemory implements Closeable
return sz;
}
- /**
- * Write current memory to a channel.
- */
- public void writeTo(final WritableByteChannel channel) throws IOException
- {
- for (int i = 0; i < blockHolders.size(); i++) {
- final ResourceHolder memoryHolder = blockHolders.get(i);
- final WritableMemory memory = memoryHolder.get();
- final int limit = limits.getInt(i);
-
- if (memory.hasByteBuffer()) {
- final ByteBuffer byteBuffer = memory.getByteBuffer().duplicate();
- byteBuffer.limit(Ints.checkedCast(memory.getRegionOffset(limit)));
- byteBuffer.position(Ints.checkedCast(memory.getRegionOffset(0)));
- Channels.writeFully(channel, byteBuffer);
- } else {
- // No implementation currently for Memory without backing ByteBuffer. (It's never needed.)
- throw new UnsupportedOperationException("Cannot write Memory without backing ByteBuffer");
- }
- }
- }
-
/**
* Write current memory to a {@link WritableMemory} buffer.
*/
diff --git a/processing/src/main/java/org/apache/druid/frame/file/FrameFile.java b/processing/src/main/java/org/apache/druid/frame/file/FrameFile.java
index 886d6ec1316..1aab2153365 100644
--- a/processing/src/main/java/org/apache/druid/frame/file/FrameFile.java
+++ b/processing/src/main/java/org/apache/druid/frame/file/FrameFile.java
@@ -19,32 +19,26 @@
package org.apache.druid.frame.file;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.io.Files;
-import org.apache.datasketches.memory.MapHandle;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.frame.Frame;
-import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.MappedByteBufferHandler;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.ReferenceCountingCloseableObject;
import org.apache.druid.utils.CloseableUtils;
-import org.apache.druid.utils.JvmUtils;
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteOrder;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.EnumSet;
@@ -63,13 +57,18 @@ import java.util.EnumSet;
*
* - 2 bytes: {@link FrameFileWriter#MAGIC}
* - NNN bytes: sequence of {@link FrameFileWriter#MARKER_FRAME} followed by one compressed frame (see {@link Frame})
+ * - 1 byte: {@link FrameFileWriter#MARKER_NO_MORE_FRAMES}
* - 4 bytes * numPartitions: end frame number of each partition (exclusive), as little-endian ints. Note that
* partitions may be empty. In this case, certain adjacent values in this array will be equal. Only present if the
* file is partitioned.
- * - 1 byte: {@link FrameFileWriter#MARKER_NO_MORE_FRAMES}
* - 8 bytes * numFrames: end of each compressed frame (exclusive), relative to start of file, as little-endian longs
* - 4 bytes: number of frames, as little-endian int
* - 4 bytes: number of partitions, as little-endian int
+ * - 4 bytes: length of footer, from {@link FrameFileWriter#MARKER_NO_MORE_FRAMES} to EOF
+ * - 4 bytes: checksum of footer (xxhash64, truncated to 32 bits), not considering these final 4 bytes
+ *
+ * Instances of this class are not thread-safe. For sharing across threads, use {@link #newReference()} to create
+ * an additional reference.
*/
public class FrameFile implements Closeable
{
@@ -80,29 +79,46 @@ public class FrameFile implements Closeable
/**
* Delete the opened frame file when all references are closed.
*/
- DELETE_ON_CLOSE,
-
- /**
- * Map using ByteBuffer. Used only for testing.
- */
- BB_MEMORY_MAP,
-
- /**
- * Map using DataSketches Memory. Used only for testing.
- */
- DS_MEMORY_MAP
+ DELETE_ON_CLOSE
}
private final File file;
- private final Memory memory;
+ private final long fileLength;
+ private final Memory footerMemory; // Footer is everything from the final MARKER_NO_MORE_FRAMES to EOF.
+ private final int maxMmapSize;
private final int numFrames;
private final int numPartitions;
private final ReferenceCountingCloseableObject referenceCounter;
private final Closeable referenceReleaser;
+ /**
+ * Mapped memory, starting from {@link #bufferOffset} in {@link #file}, up to max of {@link #maxMmapSize}. Acts as
+ * a window on the underlying file. Remapped using {@link #remapBuffer(long)}, freed using {@link #releaseBuffer()}.
+ *
+ * Even though managing multiple buffers requires extra code, we use this instead of {@link Memory#map(File)} for
+ * two reasons:
+ *
+ * - Current version of {@link Memory#map(File)} is not compatible with Java 17.
+ * - Using ByteBuffer-backed Memory enables zero-copy decompression in {@link Frame#decompress}.
+ */
+ private Memory buffer;
+
+ /**
+ * Offset of {@link #buffer} from the start of the file.
+ */
+ private long bufferOffset;
+
+ /**
+ * Runnable that unmaps {@link #buffer}.
+ */
+ private Runnable bufferCloser;
+
private FrameFile(
final File file,
- final Memory memory,
+ final long fileLength,
+ final Memory footerMemory,
+ @Nullable final Memory wholeFileMemory,
+ final int maxMmapSize,
final int numFrames,
final int numPartitions,
final ReferenceCountingCloseableObject referenceCounter,
@@ -110,17 +126,45 @@ public class FrameFile implements Closeable
)
{
this.file = file;
- this.memory = memory;
+ this.fileLength = fileLength;
+ this.footerMemory = footerMemory;
+ this.maxMmapSize = maxMmapSize;
this.numFrames = numFrames;
this.numPartitions = numPartitions;
this.referenceCounter = referenceCounter;
this.referenceReleaser = referenceReleaser;
+
+ if (wholeFileMemory != null) {
+ assert wholeFileMemory.getCapacity() == fileLength;
+
+ // Set buffer, but not bufferCloser; if buffer was passed in constructor, it is shared across references,
+ // and therefore is closed using referenceReleaser.
+ buffer = wholeFileMemory;
+ }
}
/**
* Open a frame file with certain optional flags.
+ *
+ * @param file ƒrame file
+ * @param flags optional flags
*/
public static FrameFile open(final File file, final Flag... flags) throws IOException
+ {
+ return open(file, Integer.MAX_VALUE, flags);
+ }
+
+ /**
+ * Open a frame file with certain optional flags.
+ *
+ * Package-private because this method is intended for use in tests. In production, {@code maxMmapSize} is
+ * set to {@link Integer#MAX_VALUE}.
+ *
+ * @param file ƒrame file
+ * @param maxMmapSize largest buffer to mmap at once
+ * @param flags optional flags
+ */
+ static FrameFile open(final File file, final int maxMmapSize, final Flag... flags) throws IOException
{
final EnumSet flagSet = flags.length == 0 ? EnumSet.noneOf(Flag.class) : EnumSet.copyOf(Arrays.asList(flags));
@@ -128,43 +172,89 @@ public class FrameFile implements Closeable
throw new FileNotFoundException(StringUtils.format("File [%s] not found", file));
}
- final Pair map = mapFile(file, flagSet);
- final Memory memory = map.lhs;
- final Closeable mapCloser = Preconditions.checkNotNull(map.rhs, "closer");
+ // Closer for mmap that is shared across all references: either footer only (if file size is larger
+ // Integer.MAX_VALUE) or entire file (if file size is smaller than, or equal to, Integer.MAX_VALUE).
+ Closeable sharedMapCloser = null;
+
+ try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
+ final long fileLength = randomAccessFile.length();
- try {
// Verify minimum file length.
- if (memory.getCapacity() < FrameFileWriter.MAGIC.length + FrameFileWriter.TRAILER_LENGTH) {
- throw new IOE("File [%s] is too short for magic + trailer", file);
+ if (fileLength <
+ FrameFileWriter.MAGIC.length + FrameFileWriter.TRAILER_LENGTH + Byte.BYTES /* MARKER_NO_MORE_FRAMES */) {
+ throw new IOE("File [%s] is too short (size = [%,d])", file, fileLength);
}
// Verify magic.
- if (!memory.equalTo(0, Memory.wrap(FrameFileWriter.MAGIC), 0, FrameFileWriter.MAGIC.length)) {
+ final byte[] buf = new byte[FrameFileWriter.TRAILER_LENGTH /* Larger than FrameFileWriter.MAGIC */];
+ final Memory bufMemory = Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN);
+ randomAccessFile.readFully(buf, 0, FrameFileWriter.MAGIC.length);
+
+ if (!bufMemory.equalTo(0, Memory.wrap(FrameFileWriter.MAGIC), 0, FrameFileWriter.MAGIC.length)) {
throw new IOE("File [%s] is not a frame file", file);
}
- final int numFrames = memory.getInt(memory.getCapacity() - Integer.BYTES * 2L);
- final int numPartitions = memory.getInt(memory.getCapacity() - Integer.BYTES);
+ // Read number of frames and partitions.
+ randomAccessFile.seek(fileLength - FrameFileWriter.TRAILER_LENGTH);
+ randomAccessFile.readFully(buf, 0, FrameFileWriter.TRAILER_LENGTH);
- // Verify last frame is followed by MARKER_NO_MORE_FRAMES.
- final long endMarkerPosition;
+ final int numFrames = bufMemory.getInt(0);
+ final int numPartitions = bufMemory.getInt(Integer.BYTES);
+ final int footerLength = bufMemory.getInt(Integer.BYTES * 2L);
+ final int expectedFooterChecksum = bufMemory.getInt(Integer.BYTES * 3L);
- if (numFrames > 0) {
- endMarkerPosition = getFrameEndPosition(memory, numFrames - 1, numFrames);
+ if (footerLength < 0) {
+ throw new ISE("Negative-size footer. Corrupt or truncated file?");
+ } else if (footerLength > fileLength) {
+ throw new ISE("Oversize footer. Corrupt or truncated file?");
+ }
+
+ final Memory wholeFileMemory;
+ final Memory footerMemory;
+
+ if (fileLength <= maxMmapSize) {
+ // Map entire file, use region for footer.
+ final MappedByteBufferHandler mapHandle = FileUtils.map(randomAccessFile, 0, fileLength);
+ sharedMapCloser = mapHandle;
+ wholeFileMemory = Memory.wrap(mapHandle.get(), ByteOrder.LITTLE_ENDIAN);
+
+ if (wholeFileMemory.getCapacity() != fileLength) {
+ // Check that the mapped file is the expected length. May differ if the file was updated while we're trying
+ // to map it.
+ throw new ISE("Memory map size does not match file size");
+ }
+
+ footerMemory = wholeFileMemory.region(fileLength - footerLength, footerLength, ByteOrder.LITTLE_ENDIAN);
} else {
- endMarkerPosition = FrameFileWriter.MAGIC.length;
+ // Map footer only. Will map the entire file in pages later, using "remap".
+ final MappedByteBufferHandler footerMapHandle =
+ FileUtils.map(randomAccessFile, fileLength - footerLength, footerLength);
+ sharedMapCloser = footerMapHandle;
+ wholeFileMemory = null;
+ footerMemory = Memory.wrap(footerMapHandle.get(), ByteOrder.LITTLE_ENDIAN);
}
- if (endMarkerPosition >= memory.getCapacity()) {
- throw new IOE("File [%s] end marker location out of range", file);
- }
-
- if (memory.getByte(endMarkerPosition) != FrameFileWriter.MARKER_NO_MORE_FRAMES) {
+ // Verify footer begins with MARKER_NO_MORE_FRAMES.
+ if (footerMemory.getByte(0) != FrameFileWriter.MARKER_NO_MORE_FRAMES) {
throw new IOE("File [%s] end marker not in expected location", file);
}
+ // Verify footer checksum.
+ final int actualChecksum =
+ (int) footerMemory.xxHash64(0, footerMemory.getCapacity() - Integer.BYTES, FrameFileWriter.CHECKSUM_SEED);
+
+ if (expectedFooterChecksum != actualChecksum) {
+ throw new ISE("Expected footer checksum did not match actual checksum. Corrupt or truncated file?");
+ }
+
+ // Verify footer length.
+ if (footerLength != FrameFileWriter.footerLength(numFrames, numPartitions)) {
+ throw new ISE("Expected footer length did not match actual footer length. Corrupt or truncated file?");
+ }
+
+ // Set up closer, refcounter; return instance.
final Closer fileCloser = Closer.create();
- fileCloser.register(mapCloser);
+ fileCloser.register(sharedMapCloser);
if (flagSet.contains(Flag.DELETE_ON_CLOSE)) {
fileCloser.register(() -> {
@@ -177,15 +267,25 @@ public class FrameFile implements Closeable
final ReferenceCountingCloseableObject referenceCounter =
new ReferenceCountingCloseableObject(fileCloser) {};
- return new FrameFile(file, memory, numFrames, numPartitions, referenceCounter, referenceCounter);
+ return new FrameFile(
+ file,
+ fileLength,
+ footerMemory,
+ wholeFileMemory,
+ maxMmapSize,
+ numFrames,
+ numPartitions,
+ referenceCounter,
+ referenceCounter
+ );
}
catch (Throwable e) {
// Close mapCloser, not fileCloser: if there is an error in "open" then we don't delete the file.
if (e instanceof IOException) {
// Don't wrap IOExceptions.
- throw CloseableUtils.closeInCatch((IOException) e, mapCloser);
+ throw CloseableUtils.closeInCatch((IOException) e, sharedMapCloser);
} else {
- throw CloseableUtils.closeAndWrapInCatch(e, mapCloser);
+ throw CloseableUtils.closeAndWrapInCatch(e, sharedMapCloser);
}
}
}
@@ -221,17 +321,12 @@ public class FrameFile implements Closeable
return numFrames;
} else {
final long partitionStartFrameLocation =
- memory.getCapacity()
+ footerMemory.getCapacity()
- FrameFileWriter.TRAILER_LENGTH
- (long) numFrames * Long.BYTES
- (long) (numPartitions - partition) * Integer.BYTES;
- // Bounds check: protect against possibly-corrupt data.
- if (partitionStartFrameLocation < 0 || partitionStartFrameLocation > memory.getCapacity() - Integer.BYTES) {
- throw new ISE("Corrupt frame file: partition [%,d] marker out of range", partition);
- }
-
- return memory.getInt(partitionStartFrameLocation);
+ return footerMemory.getInt(partitionStartFrameLocation);
}
}
@@ -246,29 +341,50 @@ public class FrameFile implements Closeable
throw new IAE("Frame [%,d] out of bounds", frameNumber);
}
- final long frameEnd = getFrameEndPosition(memory, frameNumber, numFrames);
+ final long frameEnd = getFrameEndPosition(frameNumber);
final long frameStart;
if (frameNumber == 0) {
frameStart = FrameFileWriter.MAGIC.length + Byte.BYTES /* MARKER_FRAME */;
} else {
- frameStart = getFrameEndPosition(memory, frameNumber - 1, numFrames) + Byte.BYTES /* MARKER_FRAME */;
+ frameStart = getFrameEndPosition(frameNumber - 1) + Byte.BYTES /* MARKER_FRAME */;
+ }
+
+ if (buffer == null || frameStart < bufferOffset || frameEnd > bufferOffset + buffer.getCapacity()) {
+ remapBuffer(frameStart);
+ }
+
+ if (frameStart < bufferOffset || frameEnd > bufferOffset + buffer.getCapacity()) {
+ // Still out of bounds after remapping successfully: must mean frame was too large to fit in maxMmapSize.
+ throw new ISE("Frame [%,d] too large (max size = %,d bytes)", frameNumber, maxMmapSize);
}
// Decompression is safe even on corrupt data: it validates position, length, checksum.
- return Frame.decompress(memory, frameStart, frameEnd - frameStart);
+ return Frame.decompress(buffer, frameStart - bufferOffset, frameEnd - frameStart);
}
/**
* Creates a new reference to this file. Calling {@link #close()} releases the reference. The original file
* is closed when it, and all additional references, are closed.
+ *
+ * The new FrameFile instance may be used concurrently with the original FrameFile instance.
*/
public FrameFile newReference()
{
final Closeable releaser = referenceCounter.incrementReferenceAndDecrementOnceCloseable()
.orElseThrow(() -> new ISE("Frame file is closed"));
- return new FrameFile(file, memory, numFrames, numPartitions, referenceCounter, releaser);
+ return new FrameFile(
+ file,
+ fileLength,
+ footerMemory,
+ bufferOffset == 0 && bufferCloser == null ? buffer : null, // If bufferCloser is null, buffer is shared
+ maxMmapSize,
+ numFrames,
+ numPartitions,
+ referenceCounter,
+ releaser
+ );
}
/**
@@ -282,7 +398,7 @@ public class FrameFile implements Closeable
@Override
public void close() throws IOException
{
- referenceReleaser.close();
+ CloseableUtils.closeAll(this::releaseBuffer, referenceReleaser);
}
/**
@@ -298,89 +414,65 @@ public class FrameFile implements Closeable
}
}
- /**
- * Maps a file, respecting the flags provided to {@link #open}.
- */
- private static Pair mapFile(final File file, final EnumSet flagSet) throws IOException
+ private long getFrameEndPosition(final int frameNumber)
{
- if (flagSet.contains(Flag.DS_MEMORY_MAP) && flagSet.contains(Flag.BB_MEMORY_MAP)) {
- throw new ISE("Cannot open with both [%s] and [%s]", Flag.DS_MEMORY_MAP, Flag.BB_MEMORY_MAP);
- } else if (flagSet.contains(Flag.DS_MEMORY_MAP)) {
- return mapFileDS(file);
- } else if (flagSet.contains(Flag.BB_MEMORY_MAP)) {
- return mapFileBB(file);
- } else if (file.length() <= Integer.MAX_VALUE) {
- // Prefer using ByteBuffer for small files, because "frame" can use it to avoid a copy when decompressing.
- return mapFileBB(file);
- } else {
- return mapFileDS(file);
- }
- }
+ assert frameNumber >= 0 && frameNumber < numFrames;
- /**
- * Maps a file using a MappedByteBuffer. This is preferred for small files, since it enables zero-copy decompression
- * in {@link #open}.
- */
- private static Pair mapFileBB(final File file) throws IOException
- {
- final MappedByteBuffer byteBuffer = Files.map(file, FileChannel.MapMode.READ_ONLY);
- byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
- return Pair.of(Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN), () -> ByteBufferUtils.unmap(byteBuffer));
- }
-
- /**
- * Maps a file using the functionality in datasketches-memory.
- */
- private static Pair mapFileDS(final File file)
- {
- final MapHandle mapHandle;
-
- try {
- mapHandle = Memory.map(file, 0, file.length(), ByteOrder.LITTLE_ENDIAN);
- }
- catch (NoClassDefFoundError | ExceptionInInitializerError e) {
- throw handleMemoryMapError(e, JvmUtils.majorVersion());
- }
-
- return Pair.of(
- mapHandle.get(),
- () -> {
- try {
- mapHandle.close();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- );
- }
-
- @VisibleForTesting
- static RuntimeException handleMemoryMapError(Throwable e, int javaMajorVersion)
- {
- // Memory.map does not work on JDK 14+ due to issues with AllocateDirectMap.
- if (javaMajorVersion >= 14) {
- throw new ISE(
- "Cannot read frame files larger than %,d bytes with Java %d. Try using Java 11.",
- Integer.MAX_VALUE,
- javaMajorVersion
- );
- } else {
- // We don't have a good reason why this happened. Throw the original error.
- throw new RE(e, "Could not map frame file");
- }
- }
-
- private static long getFrameEndPosition(final Memory memory, final int frameNumber, final int numFrames)
- {
final long frameEndPointerPosition =
- memory.getCapacity() - FrameFileWriter.TRAILER_LENGTH - (long) (numFrames - frameNumber) * Long.BYTES;
+ footerMemory.getCapacity() - FrameFileWriter.TRAILER_LENGTH - (long) (numFrames - frameNumber) * Long.BYTES;
+
+ final long frameEndPosition = footerMemory.getLong(frameEndPointerPosition);
// Bounds check: protect against possibly-corrupt data.
- if (frameEndPointerPosition < 0 || frameEndPointerPosition > memory.getCapacity() - Long.BYTES) {
+ if (frameEndPosition < 0 || frameEndPosition > fileLength - footerMemory.getCapacity()) {
throw new ISE("Corrupt frame file: frame [%,d] location out of range", frameNumber);
}
- return memory.getLong(frameEndPointerPosition);
+ return frameEndPosition;
+ }
+
+ /**
+ * Updates {@link #buffer}, {@link #bufferOffset}, and {@link #bufferCloser} to a new offset. Closes the old
+ * buffer, if any.
+ */
+ private void remapBuffer(final long offset)
+ {
+ releaseBuffer();
+
+ if (offset >= fileLength) {
+ throw new IAE("Offset [%,d] out of range for file length [%,d]", offset, fileLength);
+ }
+
+ final MappedByteBufferHandler mapHandle;
+
+ try {
+ mapHandle = FileUtils.map(file, offset, Math.min(fileLength - offset, maxMmapSize));
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ buffer = Memory.wrap(mapHandle.get(), ByteOrder.LITTLE_ENDIAN);
+ bufferCloser = mapHandle::close;
+ bufferOffset = offset;
+ }
+
+ /**
+ * Nulls out {@link #buffer} and {@link #bufferCloser} references.
+ *
+ * Explicitly frees {@link #buffer} if {@link #bufferCloser} is set. If {@link #buffer} is set, but
+ * {@link #bufferCloser} is not set, it is a shared buffer, and is not freed.
+ */
+ private void releaseBuffer()
+ {
+ try {
+ if (bufferCloser != null) {
+ bufferCloser.run();
+ }
+ }
+ finally {
+ buffer = null;
+ bufferCloser = null;
+ }
}
}
diff --git a/processing/src/main/java/org/apache/druid/frame/file/FrameFileWriter.java b/processing/src/main/java/org/apache/druid/frame/file/FrameFileWriter.java
index e73e842ed69..362e4c8fea6 100644
--- a/processing/src/main/java/org/apache/druid/frame/file/FrameFileWriter.java
+++ b/processing/src/main/java/org/apache/druid/frame/file/FrameFileWriter.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
/**
@@ -42,7 +43,8 @@ public class FrameFileWriter implements Closeable
public static final byte[] MAGIC = {(byte) 0xff, 0x01};
public static final byte MARKER_FRAME = (byte) 0x01;
public static final byte MARKER_NO_MORE_FRAMES = (byte) 0x02;
- public static final int TRAILER_LENGTH = Integer.BYTES * 2;
+ public static final int TRAILER_LENGTH = Integer.BYTES * 4;
+ public static final int CHECKSUM_SEED = 0;
public static final int NO_PARTITION = -1;
private final WritableByteChannel channel;
@@ -177,18 +179,33 @@ public class FrameFileWriter implements Closeable
writeMagicIfNeeded();
- if (!tableOfContents.reserveAdditional(TRAILER_LENGTH)) {
+ if (!tableOfContents.reserveAdditional(Integer.BYTES * 3)) {
throw new ISE("Can't finish table of contents");
}
+
final MemoryRange tocCursor = tableOfContents.cursor();
+ final int numPartitions = Ints.checkedCast(partitions.size() / Integer.BYTES);
+
tocCursor.memory().putInt(tocCursor.start(), numFrames);
- tocCursor.memory().putInt(tocCursor.start() + Integer.BYTES, Ints.checkedCast(partitions.size() / Integer.BYTES));
- tableOfContents.advanceCursor(TRAILER_LENGTH);
- channel.write(ByteBuffer.wrap(new byte[]{MARKER_NO_MORE_FRAMES}));
- partitions.writeTo(channel);
+ tocCursor.memory().putInt(tocCursor.start() + Integer.BYTES, numPartitions);
+ tocCursor.memory().putInt(tocCursor.start() + Integer.BYTES * 2L, footerLength(numFrames, numPartitions));
+ tableOfContents.advanceCursor(Integer.BYTES * 3);
+
+ // Buffer up the footer so we can compute its checksum.
+ final ByteBuffer footerBuf = ByteBuffer.allocate(footerLength(numFrames, numPartitions));
+ final WritableMemory footerMemory = WritableMemory.writableWrap(footerBuf, ByteOrder.LITTLE_ENDIAN);
+ assert Byte.BYTES + partitions.size() + tableOfContents.size() + Integer.BYTES == footerMemory.getCapacity();
+ long p = Byte.BYTES;
+ footerMemory.putByte(0, MARKER_NO_MORE_FRAMES);
+ p += partitions.writeTo(footerMemory, p);
partitions.close();
- tableOfContents.writeTo(channel);
+ p += tableOfContents.writeTo(footerMemory, p);
tableOfContents.close();
+ final int checksum = (int) footerMemory.xxHash64(0, p, CHECKSUM_SEED);
+ footerMemory.putInt(p, checksum);
+
+ // Write footer to the channel.
+ Channels.writeFully(channel, footerBuf);
channel.close();
compressionBuffer = null;
closed = true;
@@ -213,4 +230,18 @@ public class FrameFileWriter implements Closeable
return compressionBuffer;
}
+
+ /**
+ * Length of the footer: everything from MARKER_NO_MORE_FRAMES to EOF. See class-level javadoc from {@link FrameFile}
+ * for details on the format.
+ */
+ static int footerLength(final int numFrames, final int numPartitions)
+ {
+ return Ints.checkedCast(
+ Byte.BYTES // MARKER_NO_MORE_FRAMES
+ + (long) Integer.BYTES * numPartitions
+ + (long) Long.BYTES * numFrames
+ + TRAILER_LENGTH
+ );
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/frame/FrameTest.java b/processing/src/test/java/org/apache/druid/frame/FrameTest.java
index 3e2dc14bbea..b7745de0879 100644
--- a/processing/src/test/java/org/apache/druid/frame/FrameTest.java
+++ b/processing/src/test/java/org/apache/druid/frame/FrameTest.java
@@ -23,12 +23,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
-import org.apache.datasketches.memory.MapHandle;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.frame.key.SortColumn;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
-import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.ByteBufferUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
@@ -59,9 +57,7 @@ import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.Channels;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-import java.util.stream.Collectors;
@RunWith(Enclosed.class)
public class FrameTest
@@ -288,47 +284,6 @@ public class FrameTest
FRAME_DATA_COMPRESSED.length
);
}
- },
- MEMORY_FILE {
- @Override
- Frame wrap(Closer closer) throws IOException
- {
- final File file = File.createTempFile("frame-test", "");
- closer.register(file::delete);
- Files.write(FRAME_DATA, file);
- final MapHandle mapHandle = Memory.map(file);
- closer.register(
- () -> {
- try {
- mapHandle.close();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- );
- return Frame.wrap(mapHandle.get());
- }
-
- @Override
- Frame decompress(Closer closer) throws IOException
- {
- final File file = File.createTempFile("frame-test", "");
- closer.register(file::delete);
- Files.write(FRAME_DATA_COMPRESSED, file);
- final MapHandle mapHandle = Memory.map(file);
- closer.register(
- () -> {
- try {
- mapHandle.close();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- );
- return Frame.decompress(mapHandle.get(), 0, FRAME_DATA_COMPRESSED.length);
- }
};
abstract Frame wrap(Closer closer) throws IOException;
@@ -351,10 +306,7 @@ public class FrameTest
{
final List