mirror of https://github.com/apache/druid.git
FrameFile: Java 17 compatibility. (#12987)
* FrameFile: Java 17 compatibility. DataSketches Memory.map is not Java 17 compatible, and from discussions with the team, is challenging to make compatible with 17 while also retaining compatibility with 8 and 11. So, in this patch, we switch away from Memory.map and instead use the builtin JDK mmap functionality. Since it only supports maps up to Integer.MAX_VALUE, we also implement windowing in FrameFile, such that we can still handle large files. Other changes: 1) Add two new "map" functions to FileUtils, which we use in this patch. 2) Add a footer checksum to the FrameFile format. Individual frames already have checksums, but the footer was missing one. * Changes for static analysis. * wip * Fixes.
This commit is contained in:
parent
f3c47cf68c
commit
2450b96ac8
|
@ -20,7 +20,6 @@
|
||||||
package org.apache.druid.java.util.common;
|
package org.apache.druid.java.util.common;
|
||||||
|
|
||||||
import org.apache.druid.collections.ResourceHolder;
|
import org.apache.druid.collections.ResourceHolder;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
|
||||||
import org.apache.druid.utils.JvmUtils;
|
import org.apache.druid.utils.JvmUtils;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -39,8 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
*/
|
*/
|
||||||
public class ByteBufferUtils
|
public class ByteBufferUtils
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(ByteBufferUtils.class);
|
|
||||||
|
|
||||||
// the following MethodHandle lookup code is adapted from Apache Kafka
|
// the following MethodHandle lookup code is adapted from Apache Kafka
|
||||||
// https://github.com/apache/kafka/blob/e554dc518eaaa0747899e708160275f95c4e525f/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java
|
// https://github.com/apache/kafka/blob/e554dc518eaaa0747899e708160275f95c4e525f/clients/src/main/java/org/apache/kafka/common/utils/MappedByteBuffers.java
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ import java.io.FilterOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
import java.nio.MappedByteBuffer;
|
import java.nio.MappedByteBuffer;
|
||||||
import java.nio.channels.Channels;
|
import java.nio.channels.Channels;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
|
@ -164,13 +165,70 @@ public class FileUtils
|
||||||
*
|
*
|
||||||
* @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file}
|
* @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file}
|
||||||
*
|
*
|
||||||
* @throws FileNotFoundException if the {@code file} does not exist
|
* @throws FileNotFoundException if the {@code file} does not exist
|
||||||
* @throws IOException if an I/O error occurs
|
* @throws IOException if an I/O error occurs
|
||||||
|
* @throws IllegalArgumentException if length is greater than {@link Integer#MAX_VALUE}
|
||||||
* @see FileChannel#map(FileChannel.MapMode, long, long)
|
* @see FileChannel#map(FileChannel.MapMode, long, long)
|
||||||
*/
|
*/
|
||||||
public static MappedByteBufferHandler map(File file) throws IOException
|
public static MappedByteBufferHandler map(File file) throws IOException
|
||||||
{
|
{
|
||||||
MappedByteBuffer mappedByteBuffer = com.google.common.io.Files.map(file);
|
return map(file, 0, file.length());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fully maps a file read-only in to memory as per
|
||||||
|
* {@link FileChannel#map(FileChannel.MapMode, long, long)}.
|
||||||
|
*
|
||||||
|
* @param file the file to map
|
||||||
|
* @param offset starting offset for the mmap
|
||||||
|
* @param length length for the mmap
|
||||||
|
*
|
||||||
|
* @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code file}
|
||||||
|
*
|
||||||
|
* @throws FileNotFoundException if the {@code file} does not exist
|
||||||
|
* @throws IOException if an I/O error occurs
|
||||||
|
* @throws IllegalArgumentException if length is greater than {@link Integer#MAX_VALUE}
|
||||||
|
* @see FileChannel#map(FileChannel.MapMode, long, long)
|
||||||
|
*/
|
||||||
|
public static MappedByteBufferHandler map(File file, long offset, long length) throws IOException
|
||||||
|
{
|
||||||
|
if (length > Integer.MAX_VALUE) {
|
||||||
|
throw new IAE("Cannot map region larger than %,d bytes", Integer.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
|
||||||
|
final FileChannel channel = randomAccessFile.getChannel()) {
|
||||||
|
final MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
|
||||||
|
return new MappedByteBufferHandler(mappedByteBuffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fully maps a file read-only in to memory as per
|
||||||
|
* {@link FileChannel#map(FileChannel.MapMode, long, long)}.
|
||||||
|
*
|
||||||
|
* @param randomAccessFile the file to map. The file will not be closed.
|
||||||
|
* @param offset starting offset for the mmap
|
||||||
|
* @param length length for the mmap
|
||||||
|
*
|
||||||
|
* @return a {@link MappedByteBufferHandler}, wrapping a read-only buffer reflecting {@code randomAccessFile}
|
||||||
|
*
|
||||||
|
* @throws IOException if an I/O error occurs
|
||||||
|
* @throws IllegalArgumentException if length is greater than {@link Integer#MAX_VALUE}
|
||||||
|
* @see FileChannel#map(FileChannel.MapMode, long, long)
|
||||||
|
*/
|
||||||
|
public static MappedByteBufferHandler map(
|
||||||
|
RandomAccessFile randomAccessFile,
|
||||||
|
long offset,
|
||||||
|
long length
|
||||||
|
) throws IOException
|
||||||
|
{
|
||||||
|
if (length > Integer.MAX_VALUE) {
|
||||||
|
throw new IAE("Cannot map region larger than %,d bytes", Integer.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
final FileChannel channel = randomAccessFile.getChannel();
|
||||||
|
final MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
|
||||||
return new MappedByteBufferHandler(mappedByteBuffer);
|
return new MappedByteBufferHandler(mappedByteBuffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,16 +19,16 @@
|
||||||
|
|
||||||
package org.apache.druid.java.util.common;
|
package org.apache.druid.java.util.common;
|
||||||
|
|
||||||
|
import org.apache.druid.collections.ResourceHolder;
|
||||||
|
|
||||||
import java.nio.MappedByteBuffer;
|
import java.nio.MappedByteBuffer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Facilitates using try-with-resources with {@link MappedByteBuffer}s which don't implement {@link AutoCloseable}.
|
* Facilitates using try-with-resources with {@link MappedByteBuffer}.
|
||||||
*
|
|
||||||
* <p>This interface is a specialization of {@code org.apache.druid.collections.ResourceHandler}.
|
|
||||||
*
|
*
|
||||||
* @see FileUtils#map
|
* @see FileUtils#map
|
||||||
*/
|
*/
|
||||||
public final class MappedByteBufferHandler implements AutoCloseable
|
public final class MappedByteBufferHandler implements ResourceHolder<MappedByteBuffer>
|
||||||
{
|
{
|
||||||
private final MappedByteBuffer mappedByteBuffer;
|
private final MappedByteBuffer mappedByteBuffer;
|
||||||
|
|
||||||
|
@ -40,6 +40,7 @@ public final class MappedByteBufferHandler implements AutoCloseable
|
||||||
/**
|
/**
|
||||||
* Returns the wrapped buffer.
|
* Returns the wrapped buffer.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public MappedByteBuffer get()
|
public MappedByteBuffer get()
|
||||||
{
|
{
|
||||||
return mappedByteBuffer;
|
return mappedByteBuffer;
|
||||||
|
|
|
@ -59,6 +59,38 @@ public class FileUtilsTest
|
||||||
Assert.assertEquals(buffersMemoryBefore, buffersMemoryAfter);
|
Assert.assertEquals(buffersMemoryBefore, buffersMemoryAfter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMapFileTooLarge() throws IOException
|
||||||
|
{
|
||||||
|
File dataFile = temporaryFolder.newFile("data");
|
||||||
|
try (RandomAccessFile raf = new RandomAccessFile(dataFile, "rw")) {
|
||||||
|
raf.write(42);
|
||||||
|
raf.setLength(1 << 20); // 1 MiB
|
||||||
|
}
|
||||||
|
final IllegalArgumentException e = Assert.assertThrows(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
() -> FileUtils.map(dataFile, 0, (long) Integer.MAX_VALUE + 1)
|
||||||
|
);
|
||||||
|
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("Cannot map region larger than"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMapRandomAccessFileTooLarge() throws IOException
|
||||||
|
{
|
||||||
|
File dataFile = temporaryFolder.newFile("data");
|
||||||
|
try (RandomAccessFile raf = new RandomAccessFile(dataFile, "rw")) {
|
||||||
|
raf.write(42);
|
||||||
|
raf.setLength(1 << 20); // 1 MiB
|
||||||
|
}
|
||||||
|
try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r")) {
|
||||||
|
final IllegalArgumentException e = Assert.assertThrows(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
() -> FileUtils.map(raf, 0, (long) Integer.MAX_VALUE + 1)
|
||||||
|
);
|
||||||
|
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.containsString("Cannot map region larger than"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteAtomically() throws IOException
|
public void testWriteAtomically() throws IOException
|
||||||
{
|
{
|
||||||
|
|
|
@ -19,20 +19,15 @@
|
||||||
|
|
||||||
package org.apache.druid.frame.allocation;
|
package org.apache.druid.frame.allocation;
|
||||||
|
|
||||||
import com.google.common.primitives.Ints;
|
|
||||||
import it.unimi.dsi.fastutil.ints.IntArrayList;
|
import it.unimi.dsi.fastutil.ints.IntArrayList;
|
||||||
import it.unimi.dsi.fastutil.ints.IntList;
|
import it.unimi.dsi.fastutil.ints.IntList;
|
||||||
import it.unimi.dsi.fastutil.longs.LongArrayList;
|
import it.unimi.dsi.fastutil.longs.LongArrayList;
|
||||||
import org.apache.datasketches.memory.WritableMemory;
|
import org.apache.datasketches.memory.WritableMemory;
|
||||||
import org.apache.druid.collections.ResourceHolder;
|
import org.apache.druid.collections.ResourceHolder;
|
||||||
import org.apache.druid.io.Channels;
|
|
||||||
import org.apache.druid.java.util.common.IAE;
|
import org.apache.druid.java.util.common.IAE;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.nio.channels.WritableByteChannel;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -239,28 +234,6 @@ public class AppendableMemory implements Closeable
|
||||||
return sz;
|
return sz;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Write current memory to a channel.
|
|
||||||
*/
|
|
||||||
public void writeTo(final WritableByteChannel channel) throws IOException
|
|
||||||
{
|
|
||||||
for (int i = 0; i < blockHolders.size(); i++) {
|
|
||||||
final ResourceHolder<WritableMemory> memoryHolder = blockHolders.get(i);
|
|
||||||
final WritableMemory memory = memoryHolder.get();
|
|
||||||
final int limit = limits.getInt(i);
|
|
||||||
|
|
||||||
if (memory.hasByteBuffer()) {
|
|
||||||
final ByteBuffer byteBuffer = memory.getByteBuffer().duplicate();
|
|
||||||
byteBuffer.limit(Ints.checkedCast(memory.getRegionOffset(limit)));
|
|
||||||
byteBuffer.position(Ints.checkedCast(memory.getRegionOffset(0)));
|
|
||||||
Channels.writeFully(channel, byteBuffer);
|
|
||||||
} else {
|
|
||||||
// No implementation currently for Memory without backing ByteBuffer. (It's never needed.)
|
|
||||||
throw new UnsupportedOperationException("Cannot write Memory without backing ByteBuffer");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write current memory to a {@link WritableMemory} buffer.
|
* Write current memory to a {@link WritableMemory} buffer.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -19,32 +19,26 @@
|
||||||
|
|
||||||
package org.apache.druid.frame.file;
|
package org.apache.druid.frame.file;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.io.Files;
|
|
||||||
import org.apache.datasketches.memory.MapHandle;
|
|
||||||
import org.apache.datasketches.memory.Memory;
|
import org.apache.datasketches.memory.Memory;
|
||||||
import org.apache.druid.frame.Frame;
|
import org.apache.druid.frame.Frame;
|
||||||
import org.apache.druid.java.util.common.ByteBufferUtils;
|
import org.apache.druid.java.util.common.FileUtils;
|
||||||
import org.apache.druid.java.util.common.IAE;
|
import org.apache.druid.java.util.common.IAE;
|
||||||
import org.apache.druid.java.util.common.IOE;
|
import org.apache.druid.java.util.common.IOE;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.Pair;
|
import org.apache.druid.java.util.common.MappedByteBufferHandler;
|
||||||
import org.apache.druid.java.util.common.RE;
|
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.io.Closer;
|
import org.apache.druid.java.util.common.io.Closer;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.segment.ReferenceCountingCloseableObject;
|
import org.apache.druid.segment.ReferenceCountingCloseableObject;
|
||||||
import org.apache.druid.utils.CloseableUtils;
|
import org.apache.druid.utils.CloseableUtils;
|
||||||
import org.apache.druid.utils.JvmUtils;
|
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
import java.nio.ByteOrder;
|
import java.nio.ByteOrder;
|
||||||
import java.nio.MappedByteBuffer;
|
|
||||||
import java.nio.channels.FileChannel;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
|
@ -63,13 +57,18 @@ import java.util.EnumSet;
|
||||||
*
|
*
|
||||||
* - 2 bytes: {@link FrameFileWriter#MAGIC}
|
* - 2 bytes: {@link FrameFileWriter#MAGIC}
|
||||||
* - NNN bytes: sequence of {@link FrameFileWriter#MARKER_FRAME} followed by one compressed frame (see {@link Frame})
|
* - NNN bytes: sequence of {@link FrameFileWriter#MARKER_FRAME} followed by one compressed frame (see {@link Frame})
|
||||||
|
* - 1 byte: {@link FrameFileWriter#MARKER_NO_MORE_FRAMES}
|
||||||
* - 4 bytes * numPartitions: end frame number of each partition (exclusive), as little-endian ints. Note that
|
* - 4 bytes * numPartitions: end frame number of each partition (exclusive), as little-endian ints. Note that
|
||||||
* partitions may be empty. In this case, certain adjacent values in this array will be equal. Only present if the
|
* partitions may be empty. In this case, certain adjacent values in this array will be equal. Only present if the
|
||||||
* file is partitioned.
|
* file is partitioned.
|
||||||
* - 1 byte: {@link FrameFileWriter#MARKER_NO_MORE_FRAMES}
|
|
||||||
* - 8 bytes * numFrames: end of each compressed frame (exclusive), relative to start of file, as little-endian longs
|
* - 8 bytes * numFrames: end of each compressed frame (exclusive), relative to start of file, as little-endian longs
|
||||||
* - 4 bytes: number of frames, as little-endian int
|
* - 4 bytes: number of frames, as little-endian int
|
||||||
* - 4 bytes: number of partitions, as little-endian int
|
* - 4 bytes: number of partitions, as little-endian int
|
||||||
|
* - 4 bytes: length of footer, from {@link FrameFileWriter#MARKER_NO_MORE_FRAMES} to EOF
|
||||||
|
* - 4 bytes: checksum of footer (xxhash64, truncated to 32 bits), not considering these final 4 bytes
|
||||||
|
*
|
||||||
|
* Instances of this class are not thread-safe. For sharing across threads, use {@link #newReference()} to create
|
||||||
|
* an additional reference.
|
||||||
*/
|
*/
|
||||||
public class FrameFile implements Closeable
|
public class FrameFile implements Closeable
|
||||||
{
|
{
|
||||||
|
@ -80,29 +79,46 @@ public class FrameFile implements Closeable
|
||||||
/**
|
/**
|
||||||
* Delete the opened frame file when all references are closed.
|
* Delete the opened frame file when all references are closed.
|
||||||
*/
|
*/
|
||||||
DELETE_ON_CLOSE,
|
DELETE_ON_CLOSE
|
||||||
|
|
||||||
/**
|
|
||||||
* Map using ByteBuffer. Used only for testing.
|
|
||||||
*/
|
|
||||||
BB_MEMORY_MAP,
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Map using DataSketches Memory. Used only for testing.
|
|
||||||
*/
|
|
||||||
DS_MEMORY_MAP
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private final File file;
|
private final File file;
|
||||||
private final Memory memory;
|
private final long fileLength;
|
||||||
|
private final Memory footerMemory; // Footer is everything from the final MARKER_NO_MORE_FRAMES to EOF.
|
||||||
|
private final int maxMmapSize;
|
||||||
private final int numFrames;
|
private final int numFrames;
|
||||||
private final int numPartitions;
|
private final int numPartitions;
|
||||||
private final ReferenceCountingCloseableObject<Closeable> referenceCounter;
|
private final ReferenceCountingCloseableObject<Closeable> referenceCounter;
|
||||||
private final Closeable referenceReleaser;
|
private final Closeable referenceReleaser;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mapped memory, starting from {@link #bufferOffset} in {@link #file}, up to max of {@link #maxMmapSize}. Acts as
|
||||||
|
* a window on the underlying file. Remapped using {@link #remapBuffer(long)}, freed using {@link #releaseBuffer()}.
|
||||||
|
*
|
||||||
|
* Even though managing multiple buffers requires extra code, we use this instead of {@link Memory#map(File)} for
|
||||||
|
* two reasons:
|
||||||
|
*
|
||||||
|
* - Current version of {@link Memory#map(File)} is not compatible with Java 17.
|
||||||
|
* - Using ByteBuffer-backed Memory enables zero-copy decompression in {@link Frame#decompress}.
|
||||||
|
*/
|
||||||
|
private Memory buffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Offset of {@link #buffer} from the start of the file.
|
||||||
|
*/
|
||||||
|
private long bufferOffset;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runnable that unmaps {@link #buffer}.
|
||||||
|
*/
|
||||||
|
private Runnable bufferCloser;
|
||||||
|
|
||||||
private FrameFile(
|
private FrameFile(
|
||||||
final File file,
|
final File file,
|
||||||
final Memory memory,
|
final long fileLength,
|
||||||
|
final Memory footerMemory,
|
||||||
|
@Nullable final Memory wholeFileMemory,
|
||||||
|
final int maxMmapSize,
|
||||||
final int numFrames,
|
final int numFrames,
|
||||||
final int numPartitions,
|
final int numPartitions,
|
||||||
final ReferenceCountingCloseableObject<Closeable> referenceCounter,
|
final ReferenceCountingCloseableObject<Closeable> referenceCounter,
|
||||||
|
@ -110,17 +126,45 @@ public class FrameFile implements Closeable
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.file = file;
|
this.file = file;
|
||||||
this.memory = memory;
|
this.fileLength = fileLength;
|
||||||
|
this.footerMemory = footerMemory;
|
||||||
|
this.maxMmapSize = maxMmapSize;
|
||||||
this.numFrames = numFrames;
|
this.numFrames = numFrames;
|
||||||
this.numPartitions = numPartitions;
|
this.numPartitions = numPartitions;
|
||||||
this.referenceCounter = referenceCounter;
|
this.referenceCounter = referenceCounter;
|
||||||
this.referenceReleaser = referenceReleaser;
|
this.referenceReleaser = referenceReleaser;
|
||||||
|
|
||||||
|
if (wholeFileMemory != null) {
|
||||||
|
assert wholeFileMemory.getCapacity() == fileLength;
|
||||||
|
|
||||||
|
// Set buffer, but not bufferCloser; if buffer was passed in constructor, it is shared across references,
|
||||||
|
// and therefore is closed using referenceReleaser.
|
||||||
|
buffer = wholeFileMemory;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Open a frame file with certain optional flags.
|
* Open a frame file with certain optional flags.
|
||||||
|
*
|
||||||
|
* @param file ƒrame file
|
||||||
|
* @param flags optional flags
|
||||||
*/
|
*/
|
||||||
public static FrameFile open(final File file, final Flag... flags) throws IOException
|
public static FrameFile open(final File file, final Flag... flags) throws IOException
|
||||||
|
{
|
||||||
|
return open(file, Integer.MAX_VALUE, flags);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open a frame file with certain optional flags.
|
||||||
|
*
|
||||||
|
* Package-private because this method is intended for use in tests. In production, {@code maxMmapSize} is
|
||||||
|
* set to {@link Integer#MAX_VALUE}.
|
||||||
|
*
|
||||||
|
* @param file ƒrame file
|
||||||
|
* @param maxMmapSize largest buffer to mmap at once
|
||||||
|
* @param flags optional flags
|
||||||
|
*/
|
||||||
|
static FrameFile open(final File file, final int maxMmapSize, final Flag... flags) throws IOException
|
||||||
{
|
{
|
||||||
final EnumSet<Flag> flagSet = flags.length == 0 ? EnumSet.noneOf(Flag.class) : EnumSet.copyOf(Arrays.asList(flags));
|
final EnumSet<Flag> flagSet = flags.length == 0 ? EnumSet.noneOf(Flag.class) : EnumSet.copyOf(Arrays.asList(flags));
|
||||||
|
|
||||||
|
@ -128,43 +172,89 @@ public class FrameFile implements Closeable
|
||||||
throw new FileNotFoundException(StringUtils.format("File [%s] not found", file));
|
throw new FileNotFoundException(StringUtils.format("File [%s] not found", file));
|
||||||
}
|
}
|
||||||
|
|
||||||
final Pair<Memory, Closeable> map = mapFile(file, flagSet);
|
// Closer for mmap that is shared across all references: either footer only (if file size is larger
|
||||||
final Memory memory = map.lhs;
|
// Integer.MAX_VALUE) or entire file (if file size is smaller than, or equal to, Integer.MAX_VALUE).
|
||||||
final Closeable mapCloser = Preconditions.checkNotNull(map.rhs, "closer");
|
Closeable sharedMapCloser = null;
|
||||||
|
|
||||||
|
try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
|
||||||
|
final long fileLength = randomAccessFile.length();
|
||||||
|
|
||||||
try {
|
|
||||||
// Verify minimum file length.
|
// Verify minimum file length.
|
||||||
if (memory.getCapacity() < FrameFileWriter.MAGIC.length + FrameFileWriter.TRAILER_LENGTH) {
|
if (fileLength <
|
||||||
throw new IOE("File [%s] is too short for magic + trailer", file);
|
FrameFileWriter.MAGIC.length + FrameFileWriter.TRAILER_LENGTH + Byte.BYTES /* MARKER_NO_MORE_FRAMES */) {
|
||||||
|
throw new IOE("File [%s] is too short (size = [%,d])", file, fileLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify magic.
|
// Verify magic.
|
||||||
if (!memory.equalTo(0, Memory.wrap(FrameFileWriter.MAGIC), 0, FrameFileWriter.MAGIC.length)) {
|
final byte[] buf = new byte[FrameFileWriter.TRAILER_LENGTH /* Larger than FrameFileWriter.MAGIC */];
|
||||||
|
final Memory bufMemory = Memory.wrap(buf, ByteOrder.LITTLE_ENDIAN);
|
||||||
|
randomAccessFile.readFully(buf, 0, FrameFileWriter.MAGIC.length);
|
||||||
|
|
||||||
|
if (!bufMemory.equalTo(0, Memory.wrap(FrameFileWriter.MAGIC), 0, FrameFileWriter.MAGIC.length)) {
|
||||||
throw new IOE("File [%s] is not a frame file", file);
|
throw new IOE("File [%s] is not a frame file", file);
|
||||||
}
|
}
|
||||||
|
|
||||||
final int numFrames = memory.getInt(memory.getCapacity() - Integer.BYTES * 2L);
|
// Read number of frames and partitions.
|
||||||
final int numPartitions = memory.getInt(memory.getCapacity() - Integer.BYTES);
|
randomAccessFile.seek(fileLength - FrameFileWriter.TRAILER_LENGTH);
|
||||||
|
randomAccessFile.readFully(buf, 0, FrameFileWriter.TRAILER_LENGTH);
|
||||||
|
|
||||||
// Verify last frame is followed by MARKER_NO_MORE_FRAMES.
|
final int numFrames = bufMemory.getInt(0);
|
||||||
final long endMarkerPosition;
|
final int numPartitions = bufMemory.getInt(Integer.BYTES);
|
||||||
|
final int footerLength = bufMemory.getInt(Integer.BYTES * 2L);
|
||||||
|
final int expectedFooterChecksum = bufMemory.getInt(Integer.BYTES * 3L);
|
||||||
|
|
||||||
if (numFrames > 0) {
|
if (footerLength < 0) {
|
||||||
endMarkerPosition = getFrameEndPosition(memory, numFrames - 1, numFrames);
|
throw new ISE("Negative-size footer. Corrupt or truncated file?");
|
||||||
|
} else if (footerLength > fileLength) {
|
||||||
|
throw new ISE("Oversize footer. Corrupt or truncated file?");
|
||||||
|
}
|
||||||
|
|
||||||
|
final Memory wholeFileMemory;
|
||||||
|
final Memory footerMemory;
|
||||||
|
|
||||||
|
if (fileLength <= maxMmapSize) {
|
||||||
|
// Map entire file, use region for footer.
|
||||||
|
final MappedByteBufferHandler mapHandle = FileUtils.map(randomAccessFile, 0, fileLength);
|
||||||
|
sharedMapCloser = mapHandle;
|
||||||
|
wholeFileMemory = Memory.wrap(mapHandle.get(), ByteOrder.LITTLE_ENDIAN);
|
||||||
|
|
||||||
|
if (wholeFileMemory.getCapacity() != fileLength) {
|
||||||
|
// Check that the mapped file is the expected length. May differ if the file was updated while we're trying
|
||||||
|
// to map it.
|
||||||
|
throw new ISE("Memory map size does not match file size");
|
||||||
|
}
|
||||||
|
|
||||||
|
footerMemory = wholeFileMemory.region(fileLength - footerLength, footerLength, ByteOrder.LITTLE_ENDIAN);
|
||||||
} else {
|
} else {
|
||||||
endMarkerPosition = FrameFileWriter.MAGIC.length;
|
// Map footer only. Will map the entire file in pages later, using "remap".
|
||||||
|
final MappedByteBufferHandler footerMapHandle =
|
||||||
|
FileUtils.map(randomAccessFile, fileLength - footerLength, footerLength);
|
||||||
|
sharedMapCloser = footerMapHandle;
|
||||||
|
wholeFileMemory = null;
|
||||||
|
footerMemory = Memory.wrap(footerMapHandle.get(), ByteOrder.LITTLE_ENDIAN);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (endMarkerPosition >= memory.getCapacity()) {
|
// Verify footer begins with MARKER_NO_MORE_FRAMES.
|
||||||
throw new IOE("File [%s] end marker location out of range", file);
|
if (footerMemory.getByte(0) != FrameFileWriter.MARKER_NO_MORE_FRAMES) {
|
||||||
}
|
|
||||||
|
|
||||||
if (memory.getByte(endMarkerPosition) != FrameFileWriter.MARKER_NO_MORE_FRAMES) {
|
|
||||||
throw new IOE("File [%s] end marker not in expected location", file);
|
throw new IOE("File [%s] end marker not in expected location", file);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify footer checksum.
|
||||||
|
final int actualChecksum =
|
||||||
|
(int) footerMemory.xxHash64(0, footerMemory.getCapacity() - Integer.BYTES, FrameFileWriter.CHECKSUM_SEED);
|
||||||
|
|
||||||
|
if (expectedFooterChecksum != actualChecksum) {
|
||||||
|
throw new ISE("Expected footer checksum did not match actual checksum. Corrupt or truncated file?");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify footer length.
|
||||||
|
if (footerLength != FrameFileWriter.footerLength(numFrames, numPartitions)) {
|
||||||
|
throw new ISE("Expected footer length did not match actual footer length. Corrupt or truncated file?");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up closer, refcounter; return instance.
|
||||||
final Closer fileCloser = Closer.create();
|
final Closer fileCloser = Closer.create();
|
||||||
fileCloser.register(mapCloser);
|
fileCloser.register(sharedMapCloser);
|
||||||
|
|
||||||
if (flagSet.contains(Flag.DELETE_ON_CLOSE)) {
|
if (flagSet.contains(Flag.DELETE_ON_CLOSE)) {
|
||||||
fileCloser.register(() -> {
|
fileCloser.register(() -> {
|
||||||
|
@ -177,15 +267,25 @@ public class FrameFile implements Closeable
|
||||||
final ReferenceCountingCloseableObject<Closeable> referenceCounter =
|
final ReferenceCountingCloseableObject<Closeable> referenceCounter =
|
||||||
new ReferenceCountingCloseableObject<Closeable>(fileCloser) {};
|
new ReferenceCountingCloseableObject<Closeable>(fileCloser) {};
|
||||||
|
|
||||||
return new FrameFile(file, memory, numFrames, numPartitions, referenceCounter, referenceCounter);
|
return new FrameFile(
|
||||||
|
file,
|
||||||
|
fileLength,
|
||||||
|
footerMemory,
|
||||||
|
wholeFileMemory,
|
||||||
|
maxMmapSize,
|
||||||
|
numFrames,
|
||||||
|
numPartitions,
|
||||||
|
referenceCounter,
|
||||||
|
referenceCounter
|
||||||
|
);
|
||||||
}
|
}
|
||||||
catch (Throwable e) {
|
catch (Throwable e) {
|
||||||
// Close mapCloser, not fileCloser: if there is an error in "open" then we don't delete the file.
|
// Close mapCloser, not fileCloser: if there is an error in "open" then we don't delete the file.
|
||||||
if (e instanceof IOException) {
|
if (e instanceof IOException) {
|
||||||
// Don't wrap IOExceptions.
|
// Don't wrap IOExceptions.
|
||||||
throw CloseableUtils.closeInCatch((IOException) e, mapCloser);
|
throw CloseableUtils.closeInCatch((IOException) e, sharedMapCloser);
|
||||||
} else {
|
} else {
|
||||||
throw CloseableUtils.closeAndWrapInCatch(e, mapCloser);
|
throw CloseableUtils.closeAndWrapInCatch(e, sharedMapCloser);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -221,17 +321,12 @@ public class FrameFile implements Closeable
|
||||||
return numFrames;
|
return numFrames;
|
||||||
} else {
|
} else {
|
||||||
final long partitionStartFrameLocation =
|
final long partitionStartFrameLocation =
|
||||||
memory.getCapacity()
|
footerMemory.getCapacity()
|
||||||
- FrameFileWriter.TRAILER_LENGTH
|
- FrameFileWriter.TRAILER_LENGTH
|
||||||
- (long) numFrames * Long.BYTES
|
- (long) numFrames * Long.BYTES
|
||||||
- (long) (numPartitions - partition) * Integer.BYTES;
|
- (long) (numPartitions - partition) * Integer.BYTES;
|
||||||
|
|
||||||
// Bounds check: protect against possibly-corrupt data.
|
return footerMemory.getInt(partitionStartFrameLocation);
|
||||||
if (partitionStartFrameLocation < 0 || partitionStartFrameLocation > memory.getCapacity() - Integer.BYTES) {
|
|
||||||
throw new ISE("Corrupt frame file: partition [%,d] marker out of range", partition);
|
|
||||||
}
|
|
||||||
|
|
||||||
return memory.getInt(partitionStartFrameLocation);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -246,29 +341,50 @@ public class FrameFile implements Closeable
|
||||||
throw new IAE("Frame [%,d] out of bounds", frameNumber);
|
throw new IAE("Frame [%,d] out of bounds", frameNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
final long frameEnd = getFrameEndPosition(memory, frameNumber, numFrames);
|
final long frameEnd = getFrameEndPosition(frameNumber);
|
||||||
final long frameStart;
|
final long frameStart;
|
||||||
|
|
||||||
if (frameNumber == 0) {
|
if (frameNumber == 0) {
|
||||||
frameStart = FrameFileWriter.MAGIC.length + Byte.BYTES /* MARKER_FRAME */;
|
frameStart = FrameFileWriter.MAGIC.length + Byte.BYTES /* MARKER_FRAME */;
|
||||||
} else {
|
} else {
|
||||||
frameStart = getFrameEndPosition(memory, frameNumber - 1, numFrames) + Byte.BYTES /* MARKER_FRAME */;
|
frameStart = getFrameEndPosition(frameNumber - 1) + Byte.BYTES /* MARKER_FRAME */;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (buffer == null || frameStart < bufferOffset || frameEnd > bufferOffset + buffer.getCapacity()) {
|
||||||
|
remapBuffer(frameStart);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (frameStart < bufferOffset || frameEnd > bufferOffset + buffer.getCapacity()) {
|
||||||
|
// Still out of bounds after remapping successfully: must mean frame was too large to fit in maxMmapSize.
|
||||||
|
throw new ISE("Frame [%,d] too large (max size = %,d bytes)", frameNumber, maxMmapSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decompression is safe even on corrupt data: it validates position, length, checksum.
|
// Decompression is safe even on corrupt data: it validates position, length, checksum.
|
||||||
return Frame.decompress(memory, frameStart, frameEnd - frameStart);
|
return Frame.decompress(buffer, frameStart - bufferOffset, frameEnd - frameStart);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new reference to this file. Calling {@link #close()} releases the reference. The original file
|
* Creates a new reference to this file. Calling {@link #close()} releases the reference. The original file
|
||||||
* is closed when it, and all additional references, are closed.
|
* is closed when it, and all additional references, are closed.
|
||||||
|
*
|
||||||
|
* The new FrameFile instance may be used concurrently with the original FrameFile instance.
|
||||||
*/
|
*/
|
||||||
public FrameFile newReference()
|
public FrameFile newReference()
|
||||||
{
|
{
|
||||||
final Closeable releaser = referenceCounter.incrementReferenceAndDecrementOnceCloseable()
|
final Closeable releaser = referenceCounter.incrementReferenceAndDecrementOnceCloseable()
|
||||||
.orElseThrow(() -> new ISE("Frame file is closed"));
|
.orElseThrow(() -> new ISE("Frame file is closed"));
|
||||||
|
|
||||||
return new FrameFile(file, memory, numFrames, numPartitions, referenceCounter, releaser);
|
return new FrameFile(
|
||||||
|
file,
|
||||||
|
fileLength,
|
||||||
|
footerMemory,
|
||||||
|
bufferOffset == 0 && bufferCloser == null ? buffer : null, // If bufferCloser is null, buffer is shared
|
||||||
|
maxMmapSize,
|
||||||
|
numFrames,
|
||||||
|
numPartitions,
|
||||||
|
referenceCounter,
|
||||||
|
releaser
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -282,7 +398,7 @@ public class FrameFile implements Closeable
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException
|
public void close() throws IOException
|
||||||
{
|
{
|
||||||
referenceReleaser.close();
|
CloseableUtils.closeAll(this::releaseBuffer, referenceReleaser);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -298,89 +414,65 @@ public class FrameFile implements Closeable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private long getFrameEndPosition(final int frameNumber)
|
||||||
* Maps a file, respecting the flags provided to {@link #open}.
|
|
||||||
*/
|
|
||||||
private static Pair<Memory, Closeable> mapFile(final File file, final EnumSet<Flag> flagSet) throws IOException
|
|
||||||
{
|
{
|
||||||
if (flagSet.contains(Flag.DS_MEMORY_MAP) && flagSet.contains(Flag.BB_MEMORY_MAP)) {
|
assert frameNumber >= 0 && frameNumber < numFrames;
|
||||||
throw new ISE("Cannot open with both [%s] and [%s]", Flag.DS_MEMORY_MAP, Flag.BB_MEMORY_MAP);
|
|
||||||
} else if (flagSet.contains(Flag.DS_MEMORY_MAP)) {
|
|
||||||
return mapFileDS(file);
|
|
||||||
} else if (flagSet.contains(Flag.BB_MEMORY_MAP)) {
|
|
||||||
return mapFileBB(file);
|
|
||||||
} else if (file.length() <= Integer.MAX_VALUE) {
|
|
||||||
// Prefer using ByteBuffer for small files, because "frame" can use it to avoid a copy when decompressing.
|
|
||||||
return mapFileBB(file);
|
|
||||||
} else {
|
|
||||||
return mapFileDS(file);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Maps a file using a MappedByteBuffer. This is preferred for small files, since it enables zero-copy decompression
|
|
||||||
* in {@link #open}.
|
|
||||||
*/
|
|
||||||
private static Pair<Memory, Closeable> mapFileBB(final File file) throws IOException
|
|
||||||
{
|
|
||||||
final MappedByteBuffer byteBuffer = Files.map(file, FileChannel.MapMode.READ_ONLY);
|
|
||||||
byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
|
|
||||||
return Pair.of(Memory.wrap(byteBuffer, ByteOrder.LITTLE_ENDIAN), () -> ByteBufferUtils.unmap(byteBuffer));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Maps a file using the functionality in datasketches-memory.
|
|
||||||
*/
|
|
||||||
private static Pair<Memory, Closeable> mapFileDS(final File file)
|
|
||||||
{
|
|
||||||
final MapHandle mapHandle;
|
|
||||||
|
|
||||||
try {
|
|
||||||
mapHandle = Memory.map(file, 0, file.length(), ByteOrder.LITTLE_ENDIAN);
|
|
||||||
}
|
|
||||||
catch (NoClassDefFoundError | ExceptionInInitializerError e) {
|
|
||||||
throw handleMemoryMapError(e, JvmUtils.majorVersion());
|
|
||||||
}
|
|
||||||
|
|
||||||
return Pair.of(
|
|
||||||
mapHandle.get(),
|
|
||||||
() -> {
|
|
||||||
try {
|
|
||||||
mapHandle.close();
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
static RuntimeException handleMemoryMapError(Throwable e, int javaMajorVersion)
|
|
||||||
{
|
|
||||||
// Memory.map does not work on JDK 14+ due to issues with AllocateDirectMap.
|
|
||||||
if (javaMajorVersion >= 14) {
|
|
||||||
throw new ISE(
|
|
||||||
"Cannot read frame files larger than %,d bytes with Java %d. Try using Java 11.",
|
|
||||||
Integer.MAX_VALUE,
|
|
||||||
javaMajorVersion
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
// We don't have a good reason why this happened. Throw the original error.
|
|
||||||
throw new RE(e, "Could not map frame file");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static long getFrameEndPosition(final Memory memory, final int frameNumber, final int numFrames)
|
|
||||||
{
|
|
||||||
final long frameEndPointerPosition =
|
final long frameEndPointerPosition =
|
||||||
memory.getCapacity() - FrameFileWriter.TRAILER_LENGTH - (long) (numFrames - frameNumber) * Long.BYTES;
|
footerMemory.getCapacity() - FrameFileWriter.TRAILER_LENGTH - (long) (numFrames - frameNumber) * Long.BYTES;
|
||||||
|
|
||||||
|
final long frameEndPosition = footerMemory.getLong(frameEndPointerPosition);
|
||||||
|
|
||||||
// Bounds check: protect against possibly-corrupt data.
|
// Bounds check: protect against possibly-corrupt data.
|
||||||
if (frameEndPointerPosition < 0 || frameEndPointerPosition > memory.getCapacity() - Long.BYTES) {
|
if (frameEndPosition < 0 || frameEndPosition > fileLength - footerMemory.getCapacity()) {
|
||||||
throw new ISE("Corrupt frame file: frame [%,d] location out of range", frameNumber);
|
throw new ISE("Corrupt frame file: frame [%,d] location out of range", frameNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
return memory.getLong(frameEndPointerPosition);
|
return frameEndPosition;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates {@link #buffer}, {@link #bufferOffset}, and {@link #bufferCloser} to a new offset. Closes the old
|
||||||
|
* buffer, if any.
|
||||||
|
*/
|
||||||
|
private void remapBuffer(final long offset)
|
||||||
|
{
|
||||||
|
releaseBuffer();
|
||||||
|
|
||||||
|
if (offset >= fileLength) {
|
||||||
|
throw new IAE("Offset [%,d] out of range for file length [%,d]", offset, fileLength);
|
||||||
|
}
|
||||||
|
|
||||||
|
final MappedByteBufferHandler mapHandle;
|
||||||
|
|
||||||
|
try {
|
||||||
|
mapHandle = FileUtils.map(file, offset, Math.min(fileLength - offset, maxMmapSize));
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer = Memory.wrap(mapHandle.get(), ByteOrder.LITTLE_ENDIAN);
|
||||||
|
bufferCloser = mapHandle::close;
|
||||||
|
bufferOffset = offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Nulls out {@link #buffer} and {@link #bufferCloser} references.
|
||||||
|
*
|
||||||
|
* Explicitly frees {@link #buffer} if {@link #bufferCloser} is set. If {@link #buffer} is set, but
|
||||||
|
* {@link #bufferCloser} is not set, it is a shared buffer, and is not freed.
|
||||||
|
*/
|
||||||
|
private void releaseBuffer()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
if (bufferCloser != null) {
|
||||||
|
bufferCloser.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
buffer = null;
|
||||||
|
bufferCloser = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import javax.annotation.Nullable;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
import java.nio.channels.WritableByteChannel;
|
import java.nio.channels.WritableByteChannel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -42,7 +43,8 @@ public class FrameFileWriter implements Closeable
|
||||||
public static final byte[] MAGIC = {(byte) 0xff, 0x01};
|
public static final byte[] MAGIC = {(byte) 0xff, 0x01};
|
||||||
public static final byte MARKER_FRAME = (byte) 0x01;
|
public static final byte MARKER_FRAME = (byte) 0x01;
|
||||||
public static final byte MARKER_NO_MORE_FRAMES = (byte) 0x02;
|
public static final byte MARKER_NO_MORE_FRAMES = (byte) 0x02;
|
||||||
public static final int TRAILER_LENGTH = Integer.BYTES * 2;
|
public static final int TRAILER_LENGTH = Integer.BYTES * 4;
|
||||||
|
public static final int CHECKSUM_SEED = 0;
|
||||||
public static final int NO_PARTITION = -1;
|
public static final int NO_PARTITION = -1;
|
||||||
|
|
||||||
private final WritableByteChannel channel;
|
private final WritableByteChannel channel;
|
||||||
|
@ -177,18 +179,33 @@ public class FrameFileWriter implements Closeable
|
||||||
|
|
||||||
writeMagicIfNeeded();
|
writeMagicIfNeeded();
|
||||||
|
|
||||||
if (!tableOfContents.reserveAdditional(TRAILER_LENGTH)) {
|
if (!tableOfContents.reserveAdditional(Integer.BYTES * 3)) {
|
||||||
throw new ISE("Can't finish table of contents");
|
throw new ISE("Can't finish table of contents");
|
||||||
}
|
}
|
||||||
|
|
||||||
final MemoryRange<WritableMemory> tocCursor = tableOfContents.cursor();
|
final MemoryRange<WritableMemory> tocCursor = tableOfContents.cursor();
|
||||||
|
final int numPartitions = Ints.checkedCast(partitions.size() / Integer.BYTES);
|
||||||
|
|
||||||
tocCursor.memory().putInt(tocCursor.start(), numFrames);
|
tocCursor.memory().putInt(tocCursor.start(), numFrames);
|
||||||
tocCursor.memory().putInt(tocCursor.start() + Integer.BYTES, Ints.checkedCast(partitions.size() / Integer.BYTES));
|
tocCursor.memory().putInt(tocCursor.start() + Integer.BYTES, numPartitions);
|
||||||
tableOfContents.advanceCursor(TRAILER_LENGTH);
|
tocCursor.memory().putInt(tocCursor.start() + Integer.BYTES * 2L, footerLength(numFrames, numPartitions));
|
||||||
channel.write(ByteBuffer.wrap(new byte[]{MARKER_NO_MORE_FRAMES}));
|
tableOfContents.advanceCursor(Integer.BYTES * 3);
|
||||||
partitions.writeTo(channel);
|
|
||||||
|
// Buffer up the footer so we can compute its checksum.
|
||||||
|
final ByteBuffer footerBuf = ByteBuffer.allocate(footerLength(numFrames, numPartitions));
|
||||||
|
final WritableMemory footerMemory = WritableMemory.writableWrap(footerBuf, ByteOrder.LITTLE_ENDIAN);
|
||||||
|
assert Byte.BYTES + partitions.size() + tableOfContents.size() + Integer.BYTES == footerMemory.getCapacity();
|
||||||
|
long p = Byte.BYTES;
|
||||||
|
footerMemory.putByte(0, MARKER_NO_MORE_FRAMES);
|
||||||
|
p += partitions.writeTo(footerMemory, p);
|
||||||
partitions.close();
|
partitions.close();
|
||||||
tableOfContents.writeTo(channel);
|
p += tableOfContents.writeTo(footerMemory, p);
|
||||||
tableOfContents.close();
|
tableOfContents.close();
|
||||||
|
final int checksum = (int) footerMemory.xxHash64(0, p, CHECKSUM_SEED);
|
||||||
|
footerMemory.putInt(p, checksum);
|
||||||
|
|
||||||
|
// Write footer to the channel.
|
||||||
|
Channels.writeFully(channel, footerBuf);
|
||||||
channel.close();
|
channel.close();
|
||||||
compressionBuffer = null;
|
compressionBuffer = null;
|
||||||
closed = true;
|
closed = true;
|
||||||
|
@ -213,4 +230,18 @@ public class FrameFileWriter implements Closeable
|
||||||
|
|
||||||
return compressionBuffer;
|
return compressionBuffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Length of the footer: everything from MARKER_NO_MORE_FRAMES to EOF. See class-level javadoc from {@link FrameFile}
|
||||||
|
* for details on the format.
|
||||||
|
*/
|
||||||
|
static int footerLength(final int numFrames, final int numPartitions)
|
||||||
|
{
|
||||||
|
return Ints.checkedCast(
|
||||||
|
Byte.BYTES // MARKER_NO_MORE_FRAMES
|
||||||
|
+ (long) Integer.BYTES * numPartitions
|
||||||
|
+ (long) Long.BYTES * numFrames
|
||||||
|
+ TRAILER_LENGTH
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,12 +23,10 @@ import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.io.Files;
|
import com.google.common.io.Files;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
import org.apache.datasketches.memory.MapHandle;
|
|
||||||
import org.apache.datasketches.memory.Memory;
|
import org.apache.datasketches.memory.Memory;
|
||||||
import org.apache.datasketches.memory.WritableMemory;
|
import org.apache.datasketches.memory.WritableMemory;
|
||||||
import org.apache.druid.frame.key.SortColumn;
|
import org.apache.druid.frame.key.SortColumn;
|
||||||
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
|
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
|
||||||
import org.apache.druid.frame.testutil.FrameTestUtil;
|
|
||||||
import org.apache.druid.java.util.common.ByteBufferUtils;
|
import org.apache.druid.java.util.common.ByteBufferUtils;
|
||||||
import org.apache.druid.java.util.common.io.Closer;
|
import org.apache.druid.java.util.common.io.Closer;
|
||||||
import org.apache.druid.segment.QueryableIndexStorageAdapter;
|
import org.apache.druid.segment.QueryableIndexStorageAdapter;
|
||||||
|
@ -59,9 +57,7 @@ import java.nio.ByteOrder;
|
||||||
import java.nio.MappedByteBuffer;
|
import java.nio.MappedByteBuffer;
|
||||||
import java.nio.channels.Channels;
|
import java.nio.channels.Channels;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
@RunWith(Enclosed.class)
|
@RunWith(Enclosed.class)
|
||||||
public class FrameTest
|
public class FrameTest
|
||||||
|
@ -288,47 +284,6 @@ public class FrameTest
|
||||||
FRAME_DATA_COMPRESSED.length
|
FRAME_DATA_COMPRESSED.length
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
},
|
|
||||||
MEMORY_FILE {
|
|
||||||
@Override
|
|
||||||
Frame wrap(Closer closer) throws IOException
|
|
||||||
{
|
|
||||||
final File file = File.createTempFile("frame-test", "");
|
|
||||||
closer.register(file::delete);
|
|
||||||
Files.write(FRAME_DATA, file);
|
|
||||||
final MapHandle mapHandle = Memory.map(file);
|
|
||||||
closer.register(
|
|
||||||
() -> {
|
|
||||||
try {
|
|
||||||
mapHandle.close();
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
return Frame.wrap(mapHandle.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
Frame decompress(Closer closer) throws IOException
|
|
||||||
{
|
|
||||||
final File file = File.createTempFile("frame-test", "");
|
|
||||||
closer.register(file::delete);
|
|
||||||
Files.write(FRAME_DATA_COMPRESSED, file);
|
|
||||||
final MapHandle mapHandle = Memory.map(file);
|
|
||||||
closer.register(
|
|
||||||
() -> {
|
|
||||||
try {
|
|
||||||
mapHandle.close();
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
return Frame.decompress(mapHandle.get(), 0, FRAME_DATA_COMPRESSED.length);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
abstract Frame wrap(Closer closer) throws IOException;
|
abstract Frame wrap(Closer closer) throws IOException;
|
||||||
|
@ -351,10 +306,7 @@ public class FrameTest
|
||||||
{
|
{
|
||||||
final List<Object[]> constructors = new ArrayList<>();
|
final List<Object[]> constructors = new ArrayList<>();
|
||||||
|
|
||||||
for (MemType memType :
|
for (MemType memType : MemType.values()) {
|
||||||
Arrays.stream(MemType.values())
|
|
||||||
.filter(m -> FrameTestUtil.jdkCanDataSketchesMemoryMap() || m != MemType.MEMORY_FILE)
|
|
||||||
.collect(Collectors.toList())) {
|
|
||||||
for (boolean compressed : new boolean[]{true, false}) {
|
for (boolean compressed : new boolean[]{true, false}) {
|
||||||
constructors.add(new Object[]{memType, compressed});
|
constructors.add(new Object[]{memType, compressed});
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,15 +41,12 @@ import org.apache.druid.segment.column.RowSignature;
|
||||||
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||||
import org.apache.druid.timeline.SegmentId;
|
import org.apache.druid.timeline.SegmentId;
|
||||||
import org.hamcrest.CoreMatchers;
|
|
||||||
import org.hamcrest.MatcherAssert;
|
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.internal.matchers.ThrowableMessageMatcher;
|
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
@ -121,7 +118,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
private final int maxRowsPerFrame;
|
private final int maxRowsPerFrame;
|
||||||
private final boolean partitioned;
|
private final boolean partitioned;
|
||||||
private final AdapterType adapterType;
|
private final AdapterType adapterType;
|
||||||
private final FrameFile.Flag openMode;
|
private final int maxMmapSize;
|
||||||
|
|
||||||
private StorageAdapter adapter;
|
private StorageAdapter adapter;
|
||||||
private File file;
|
private File file;
|
||||||
|
@ -131,14 +128,14 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
final int maxRowsPerFrame,
|
final int maxRowsPerFrame,
|
||||||
final boolean partitioned,
|
final boolean partitioned,
|
||||||
final AdapterType adapterType,
|
final AdapterType adapterType,
|
||||||
final FrameFile.Flag openMode
|
final int maxMmapSize
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.frameType = frameType;
|
this.frameType = frameType;
|
||||||
this.maxRowsPerFrame = maxRowsPerFrame;
|
this.maxRowsPerFrame = maxRowsPerFrame;
|
||||||
this.partitioned = partitioned;
|
this.partitioned = partitioned;
|
||||||
this.adapterType = adapterType;
|
this.adapterType = adapterType;
|
||||||
this.openMode = openMode;
|
this.maxMmapSize = maxMmapSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Parameterized.Parameters(
|
@Parameterized.Parameters(
|
||||||
|
@ -146,27 +143,26 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
+ "maxRowsPerFrame = {1}, "
|
+ "maxRowsPerFrame = {1}, "
|
||||||
+ "partitioned = {2}, "
|
+ "partitioned = {2}, "
|
||||||
+ "adapter = {3}, "
|
+ "adapter = {3}, "
|
||||||
+ "openMode = {4}"
|
+ "maxMmapSize = {4}"
|
||||||
)
|
)
|
||||||
public static Iterable<Object[]> constructorFeeder()
|
public static Iterable<Object[]> constructorFeeder()
|
||||||
{
|
{
|
||||||
final List<Object[]> constructors = new ArrayList<>();
|
final List<Object[]> constructors = new ArrayList<>();
|
||||||
final List<FrameFile.Flag> openModes = new ArrayList<>();
|
|
||||||
openModes.add(FrameFile.Flag.BB_MEMORY_MAP);
|
|
||||||
|
|
||||||
if (FrameTestUtil.jdkCanDataSketchesMemoryMap()) {
|
|
||||||
// datasketches-memory mapping only works up through JDK 13. Higher JDK versions are unable to load 2GB+ files.
|
|
||||||
// Skip these tests on higher JDK versions, since we test with JDK 15 in CI even though we don't officially
|
|
||||||
// support it yet.
|
|
||||||
openModes.add(FrameFile.Flag.DS_MEMORY_MAP);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (FrameType frameType : FrameType.values()) {
|
for (FrameType frameType : FrameType.values()) {
|
||||||
for (int maxRowsPerFrame : new int[]{1, 17, 50, PARTITION_SIZE, Integer.MAX_VALUE}) {
|
for (int maxRowsPerFrame : new int[]{1, 17, 50, PARTITION_SIZE, Integer.MAX_VALUE}) {
|
||||||
for (boolean partitioned : new boolean[]{true, false}) {
|
for (boolean partitioned : new boolean[]{true, false}) {
|
||||||
for (AdapterType adapterType : AdapterType.values()) {
|
for (AdapterType adapterType : AdapterType.values()) {
|
||||||
for (FrameFile.Flag openMode : openModes) {
|
final int[] maxMmapSizes;
|
||||||
constructors.add(new Object[]{frameType, maxRowsPerFrame, partitioned, adapterType, openMode});
|
|
||||||
|
if (maxRowsPerFrame == 1) {
|
||||||
|
maxMmapSizes = new int[]{1_000, 10_000, Integer.MAX_VALUE};
|
||||||
|
} else {
|
||||||
|
maxMmapSizes = new int[]{Integer.MAX_VALUE};
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int maxMmapSize : maxMmapSizes) {
|
||||||
|
constructors.add(new Object[]{frameType, maxRowsPerFrame, partitioned, adapterType, maxMmapSize});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -215,7 +211,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
@Test
|
@Test
|
||||||
public void test_numFrames() throws IOException
|
public void test_numFrames() throws IOException
|
||||||
{
|
{
|
||||||
try (final FrameFile frameFile = FrameFile.open(file, openMode)) {
|
try (final FrameFile frameFile = FrameFile.open(file, maxMmapSize)) {
|
||||||
Assert.assertEquals(computeExpectedNumFrames(), frameFile.numFrames());
|
Assert.assertEquals(computeExpectedNumFrames(), frameFile.numFrames());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -223,7 +219,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
@Test
|
@Test
|
||||||
public void test_numPartitions() throws IOException
|
public void test_numPartitions() throws IOException
|
||||||
{
|
{
|
||||||
try (final FrameFile frameFile = FrameFile.open(file, openMode)) {
|
try (final FrameFile frameFile = FrameFile.open(file, maxMmapSize)) {
|
||||||
Assert.assertEquals(computeExpectedNumPartitions(), frameFile.numPartitions());
|
Assert.assertEquals(computeExpectedNumPartitions(), frameFile.numPartitions());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -231,7 +227,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
@Test
|
@Test
|
||||||
public void test_frame_first() throws IOException
|
public void test_frame_first() throws IOException
|
||||||
{
|
{
|
||||||
try (final FrameFile frameFile = FrameFile.open(file, openMode)) {
|
try (final FrameFile frameFile = FrameFile.open(file, maxMmapSize)) {
|
||||||
// Skip test for empty files.
|
// Skip test for empty files.
|
||||||
Assume.assumeThat(frameFile.numFrames(), Matchers.greaterThan(0));
|
Assume.assumeThat(frameFile.numFrames(), Matchers.greaterThan(0));
|
||||||
|
|
||||||
|
@ -243,7 +239,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
@Test
|
@Test
|
||||||
public void test_frame_last() throws IOException
|
public void test_frame_last() throws IOException
|
||||||
{
|
{
|
||||||
try (final FrameFile frameFile = FrameFile.open(file, openMode)) {
|
try (final FrameFile frameFile = FrameFile.open(file, maxMmapSize)) {
|
||||||
// Skip test for empty files.
|
// Skip test for empty files.
|
||||||
Assume.assumeThat(frameFile.numFrames(), Matchers.greaterThan(0));
|
Assume.assumeThat(frameFile.numFrames(), Matchers.greaterThan(0));
|
||||||
|
|
||||||
|
@ -260,7 +256,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
@Test
|
@Test
|
||||||
public void test_frame_outOfBoundsNegative() throws IOException
|
public void test_frame_outOfBoundsNegative() throws IOException
|
||||||
{
|
{
|
||||||
try (final FrameFile frameFile = FrameFile.open(file, openMode)) {
|
try (final FrameFile frameFile = FrameFile.open(file, maxMmapSize)) {
|
||||||
expectedException.expect(IllegalArgumentException.class);
|
expectedException.expect(IllegalArgumentException.class);
|
||||||
expectedException.expectMessage("Frame [-1] out of bounds");
|
expectedException.expectMessage("Frame [-1] out of bounds");
|
||||||
frameFile.frame(-1);
|
frameFile.frame(-1);
|
||||||
|
@ -270,7 +266,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
@Test
|
@Test
|
||||||
public void test_frame_outOfBoundsTooLarge() throws IOException
|
public void test_frame_outOfBoundsTooLarge() throws IOException
|
||||||
{
|
{
|
||||||
try (final FrameFile frameFile = FrameFile.open(file, openMode)) {
|
try (final FrameFile frameFile = FrameFile.open(file, maxMmapSize)) {
|
||||||
expectedException.expect(IllegalArgumentException.class);
|
expectedException.expect(IllegalArgumentException.class);
|
||||||
expectedException.expectMessage(StringUtils.format("Frame [%,d] out of bounds", frameFile.numFrames()));
|
expectedException.expectMessage(StringUtils.format("Frame [%,d] out of bounds", frameFile.numFrames()));
|
||||||
frameFile.frame(frameFile.numFrames());
|
frameFile.frame(frameFile.numFrames());
|
||||||
|
@ -282,7 +278,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
{
|
{
|
||||||
final FrameReader frameReader = FrameReader.create(adapter.getRowSignature());
|
final FrameReader frameReader = FrameReader.create(adapter.getRowSignature());
|
||||||
|
|
||||||
try (final FrameFile frameFile = FrameFile.open(file, openMode)) {
|
try (final FrameFile frameFile = FrameFile.open(file, maxMmapSize)) {
|
||||||
final Sequence<List<Object>> frameFileRows = Sequences.concat(
|
final Sequence<List<Object>> frameFileRows = Sequences.concat(
|
||||||
() -> IntStream.range(0, frameFile.numFrames())
|
() -> IntStream.range(0, frameFile.numFrames())
|
||||||
.mapToObj(frameFile::frame)
|
.mapToObj(frameFile::frame)
|
||||||
|
@ -299,7 +295,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
@Test
|
@Test
|
||||||
public void test_getPartitionStartFrame() throws IOException
|
public void test_getPartitionStartFrame() throws IOException
|
||||||
{
|
{
|
||||||
try (final FrameFile frameFile = FrameFile.open(file, openMode)) {
|
try (final FrameFile frameFile = FrameFile.open(file, maxMmapSize)) {
|
||||||
if (partitioned) {
|
if (partitioned) {
|
||||||
for (int partitionNum = 0; partitionNum < frameFile.numPartitions(); partitionNum++) {
|
for (int partitionNum = 0; partitionNum < frameFile.numPartitions(); partitionNum++) {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
|
@ -324,7 +320,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
@Test
|
@Test
|
||||||
public void test_file() throws IOException
|
public void test_file() throws IOException
|
||||||
{
|
{
|
||||||
try (final FrameFile frameFile = FrameFile.open(file, openMode)) {
|
try (final FrameFile frameFile = FrameFile.open(file, maxMmapSize)) {
|
||||||
Assert.assertEquals(file, frameFile.file());
|
Assert.assertEquals(file, frameFile.file());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -332,7 +328,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
@Test
|
@Test
|
||||||
public void test_open_withDeleteOnClose() throws IOException
|
public void test_open_withDeleteOnClose() throws IOException
|
||||||
{
|
{
|
||||||
FrameFile.open(file, openMode).close();
|
FrameFile.open(file, maxMmapSize).close();
|
||||||
Assert.assertTrue(file.exists());
|
Assert.assertTrue(file.exists());
|
||||||
|
|
||||||
FrameFile.open(file, FrameFile.Flag.DELETE_ON_CLOSE).close();
|
FrameFile.open(file, FrameFile.Flag.DELETE_ON_CLOSE).close();
|
||||||
|
@ -375,52 +371,6 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
||||||
frameFile1.newReference();
|
frameFile1.newReference();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void test_handleMemoryMapError_java11()
|
|
||||||
{
|
|
||||||
@SuppressWarnings("ThrowableNotThrown")
|
|
||||||
final RuntimeException e = Assert.assertThrows(
|
|
||||||
RuntimeException.class,
|
|
||||||
() -> FrameFile.handleMemoryMapError(new IllegalAccessError("foo"), 11)
|
|
||||||
);
|
|
||||||
|
|
||||||
MatcherAssert.assertThat(
|
|
||||||
e,
|
|
||||||
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Could not map frame file"))
|
|
||||||
);
|
|
||||||
|
|
||||||
// Include the original error, since we don't have a better explanation.
|
|
||||||
MatcherAssert.assertThat(
|
|
||||||
e.getCause(),
|
|
||||||
CoreMatchers.instanceOf(IllegalAccessError.class)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void test_handleMemoryMapError_java17()
|
|
||||||
{
|
|
||||||
@SuppressWarnings("ThrowableNotThrown")
|
|
||||||
final IllegalStateException e = Assert.assertThrows(
|
|
||||||
IllegalStateException.class,
|
|
||||||
() -> FrameFile.handleMemoryMapError(new IllegalAccessError("foo"), 17)
|
|
||||||
);
|
|
||||||
|
|
||||||
MatcherAssert.assertThat(
|
|
||||||
e,
|
|
||||||
ThrowableMessageMatcher.hasMessage(
|
|
||||||
CoreMatchers.containsString(
|
|
||||||
StringUtils.format(
|
|
||||||
"Cannot read frame files larger than %,d bytes with Java 17.",
|
|
||||||
Integer.MAX_VALUE
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Cause not included; we want to keep logs relatively cleaner and highlight the actual issue.
|
|
||||||
Assert.assertNull(e.getCause());
|
|
||||||
}
|
|
||||||
|
|
||||||
private int computeExpectedNumFrames()
|
private int computeExpectedNumFrames()
|
||||||
{
|
{
|
||||||
return IntMath.divide(countRows(adapter), maxRowsPerFrame, RoundingMode.CEILING);
|
return IntMath.divide(countRows(adapter), maxRowsPerFrame, RoundingMode.CEILING);
|
||||||
|
|
|
@ -70,16 +70,11 @@ public class FrameFileWriterTest extends InitializedNullHandlingTest
|
||||||
|
|
||||||
fileWriter.abort();
|
fileWriter.abort();
|
||||||
|
|
||||||
final IOException e = Assert.assertThrows(IOException.class, () -> FrameFile.open(file));
|
final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, () -> FrameFile.open(file));
|
||||||
|
|
||||||
MatcherAssert.assertThat(
|
MatcherAssert.assertThat(
|
||||||
e,
|
e,
|
||||||
ThrowableMessageMatcher.hasMessage(
|
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Corrupt or truncated file?"))
|
||||||
CoreMatchers.anyOf(
|
|
||||||
CoreMatchers.containsString("end marker location out of range"),
|
|
||||||
CoreMatchers.containsString("end marker not in expected location")
|
|
||||||
)
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,6 @@ import org.apache.druid.segment.data.IndexedInts;
|
||||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||||
import org.apache.druid.segment.vector.VectorCursor;
|
import org.apache.druid.segment.vector.VectorCursor;
|
||||||
import org.apache.druid.timeline.SegmentId;
|
import org.apache.druid.timeline.SegmentId;
|
||||||
import org.apache.druid.utils.JvmUtils;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -313,15 +312,6 @@ public class FrameTestUtil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Whether the current JDK supports {@link org.apache.datasketches.memory.Memory#map}. This is needed to read
|
|
||||||
* frame files 2GB+ in size.
|
|
||||||
*/
|
|
||||||
public static boolean jdkCanDataSketchesMemoryMap()
|
|
||||||
{
|
|
||||||
return JvmUtils.majorVersion() < 14;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Supplier<Object> dimensionSelectorReader(final DimensionSelector selector)
|
private static Supplier<Object> dimensionSelectorReader(final DimensionSelector selector)
|
||||||
{
|
{
|
||||||
return () -> {
|
return () -> {
|
||||||
|
|
Loading…
Reference in New Issue