Aggregate files from the same segment into a single Arena (#13570)

This commit adds a ref counted shared arena to support aggregating segment files into a single Arena.
This commit is contained in:
Chris Hegarty 2024-07-25 10:12:02 +01:00 committed by GitHub
parent 4c1d50d8e8
commit b4fb425c43
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 483 additions and 11 deletions

View File

@ -300,6 +300,13 @@ Optimizations
* GITHUB#13582: Stop requiring MaxScoreBulkScorer's outer window from having at
least INNER_WINDOW_SIZE docs. (Adrien Grand)
* GITHUB#13570, GITHUB#13574, GITHUB#13535: Avoid performance degradation with closing shared Arenas.
Closing many individual index files can potentially lead to a degradation in execution performance.
Index files are mmapped one-to-one with the JDK's foreign shared Arena. The JVM deoptimizes the top
few frames of all threads when closing a shared Arena (see JDK-8335480). We mitigate this situation
by 1) using a confined Arena where appropriate, and 2) grouping files from the same segment to a
single shared Arena. (Chris Hegarty, Michael Gibney, Uwe Schindler)
Changes in runtime behavior
---------------------

View File

@ -16,14 +16,20 @@
*/
package org.apache.lucene.store;
import static org.apache.lucene.index.IndexFileNames.CODEC_FILE_PATTERN;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.nio.channels.ClosedChannelException; // javadoc @link
import java.nio.file.Path;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.logging.Logger;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.Constants;
/**
@ -42,6 +48,11 @@ import org.apache.lucene.util.Constants;
* performance of searches on a cold page cache at the expense of slowing down opening an index. See
* {@link #setPreload(BiPredicate)} for more details.
*
* <p>This class supports grouping of files that are part of the same logical group. This is a hint
* that allows for better handling of resources. For example, individual files that are part of the
* same segment can be considered part of the same logical group. See {@link
* #setGroupingFunction(Function)} for more details.
*
* <p>This class will use the modern {@link java.lang.foreign.MemorySegment} API available since
* Java 21 which allows to safely unmap previously mmapped files after closing the {@link
* IndexInput}s. There is no need to enable the "preview feature" of your Java version; it works out
@ -83,6 +94,41 @@ public class MMapDirectory extends FSDirectory {
*/
public static final BiPredicate<String, IOContext> NO_FILES = (filename, context) -> false;
/**
* This sysprop allows to control the total maximum number of mmapped files that can be associated
* with a single shared {@link java.lang.foreign.Arena foreign Arena}. For example, to set the max
* number of permits to 256, pass the following on the command line pass {@code
* -Dorg.apache.lucene.store.MMapDirectory.sharedArenaMaxPermits=256}. Setting a value of 1
* associates one file to one shared arena.
*
* @lucene.internal
*/
public static final String SHARED_ARENA_MAX_PERMITS_SYSPROP =
"org.apache.lucene.store.MMapDirectory.sharedArenaMaxPermits";
/** Argument for {@link #setGroupingFunction(Function)} that configures no grouping. */
public static final Function<String, Optional<String>> NO_GROUPING = filename -> Optional.empty();
/** Argument for {@link #setGroupingFunction(Function)} that configures grouping by segment. */
public static final Function<String, Optional<String>> GROUP_BY_SEGMENT =
filename -> {
if (!CODEC_FILE_PATTERN.matcher(filename).matches()) {
return Optional.empty();
}
String groupKey = IndexFileNames.parseSegmentName(filename).substring(1);
try {
// keep the original generation (=0) in base group, later generations in extra group
if (IndexFileNames.parseGeneration(filename) > 0) {
groupKey += "-g";
}
} catch (
@SuppressWarnings("unused")
NumberFormatException unused) {
// does not confirm to the generation syntax, or trash
}
return Optional.of(groupKey);
};
/**
* Argument for {@link #setPreload(BiPredicate)} that configures files to be preloaded upon
* opening them if they use the {@link ReadAdvice#RANDOM_PRELOAD} advice.
@ -102,6 +148,11 @@ public class MMapDirectory extends FSDirectory {
*/
public static final long DEFAULT_MAX_CHUNK_SIZE;
/** A provider specific context object or null, that will be passed to openInput. */
final Object attachment = PROVIDER.attachment();
private Function<String, Optional<String>> groupingFunction = GROUP_BY_SEGMENT;
final int chunkSizePower;
/**
@ -184,6 +235,21 @@ public class MMapDirectory extends FSDirectory {
this.preload = preload;
}
/**
* Configures a grouping function for files that are part of the same logical group. The gathering
* of files into a logical group is a hint that allows for better handling of resources.
*
* <p>By default, grouping is {@link #GROUP_BY_SEGMENT}. To disable, invoke this method with
* {@link #NO_GROUPING}.
*
* @param groupingFunction a function that accepts a file name and returns an optional group key.
* If the optional is present, then its value is the logical group to which the file belongs.
* Otherwise, the file name if not associated with any logical group.
*/
public void setGroupingFunction(Function<String, Optional<String>> groupingFunction) {
this.groupingFunction = groupingFunction;
}
/**
* Returns the current mmap chunk size.
*
@ -199,20 +265,37 @@ public class MMapDirectory extends FSDirectory {
ensureOpen();
ensureCanRead(name);
Path path = directory.resolve(name);
return PROVIDER.openInput(path, context, chunkSizePower, preload.test(name, context));
return PROVIDER.openInput(
path,
context,
chunkSizePower,
preload.test(name, context),
groupingFunction.apply(name),
attachment);
}
// visible for tests:
static final MMapIndexInputProvider PROVIDER;
static final MMapIndexInputProvider<Object> PROVIDER;
interface MMapIndexInputProvider {
IndexInput openInput(Path path, IOContext context, int chunkSizePower, boolean preload)
interface MMapIndexInputProvider<A> {
IndexInput openInput(
Path path,
IOContext context,
int chunkSizePower,
boolean preload,
Optional<String> group,
A attachment)
throws IOException;
long getDefaultMaxChunkSize();
boolean supportsMadvise();
/** An optional attachment of the provider, that will be passed to openInput. */
default A attachment() {
return null;
}
default IOException convertMapFailedIOException(
IOException ioe, String resourceDescription, long bufSize) {
final String originalMessage;
@ -256,15 +339,33 @@ public class MMapDirectory extends FSDirectory {
}
}
private static MMapIndexInputProvider lookupProvider() {
private static int getSharedArenaMaxPermitsSysprop() {
int ret = 1024; // default value
try {
String str = System.getProperty(SHARED_ARENA_MAX_PERMITS_SYSPROP);
if (str != null) {
ret = Integer.parseInt(str);
}
} catch (@SuppressWarnings("unused") NumberFormatException | SecurityException ignored) {
Logger.getLogger(MMapDirectory.class.getName())
.warning(
"Cannot read sysprop "
+ SHARED_ARENA_MAX_PERMITS_SYSPROP
+ ", so the default value will be used.");
}
return ret;
}
private static <A> MMapIndexInputProvider<A> lookupProvider() {
final var maxPermits = getSharedArenaMaxPermitsSysprop();
final var lookup = MethodHandles.lookup();
try {
final var cls = lookup.findClass("org.apache.lucene.store.MemorySegmentIndexInputProvider");
// we use method handles, so we do not need to deal with setAccessible as we have private
// access through the lookup:
final var constr = lookup.findConstructor(cls, MethodType.methodType(void.class));
final var constr = lookup.findConstructor(cls, MethodType.methodType(void.class, int.class));
try {
return (MMapIndexInputProvider) constr.invoke();
return (MMapIndexInputProvider<A>) constr.invoke(maxPermits);
} catch (RuntimeException | Error e) {
throw e;
} catch (Throwable th) {

View File

@ -24,20 +24,32 @@ import java.nio.channels.FileChannel.MapMode;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.Unwrappable;
@SuppressWarnings("preview")
final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexInputProvider {
final class MemorySegmentIndexInputProvider
implements MMapDirectory.MMapIndexInputProvider<
ConcurrentHashMap<String, RefCountedSharedArena>> {
private final Optional<NativeAccess> nativeAccess;
private final int sharedArenaMaxPermits;
MemorySegmentIndexInputProvider() {
MemorySegmentIndexInputProvider(int maxPermits) {
this.nativeAccess = NativeAccess.getImplementation();
this.sharedArenaMaxPermits = checkMaxPermits(maxPermits);
}
@Override
public IndexInput openInput(Path path, IOContext context, int chunkSizePower, boolean preload)
public IndexInput openInput(
Path path,
IOContext context,
int chunkSizePower,
boolean preload,
Optional<String> group,
ConcurrentHashMap<String, RefCountedSharedArena> arenas)
throws IOException {
final String resourceDescription = "MemorySegmentIndexInput(path=\"" + path.toString() + "\")";
@ -46,7 +58,7 @@ final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexIn
boolean success = false;
final boolean confined = context == IOContext.READONCE;
final Arena arena = confined ? Arena.ofConfined() : Arena.ofShared();
final Arena arena = confined ? Arena.ofConfined() : getSharedArena(group, arenas);
try (var fc = FileChannel.open(path, StandardOpenOption.READ)) {
final long fileSize = fc.size();
final IndexInput in =
@ -125,4 +137,53 @@ final class MemorySegmentIndexInputProvider implements MMapDirectory.MMapIndexIn
}
return segments;
}
@Override
public ConcurrentHashMap<String, RefCountedSharedArena> attachment() {
return new ConcurrentHashMap<>();
}
private static int checkMaxPermits(int maxPermits) {
if (RefCountedSharedArena.validMaxPermits(maxPermits)) {
return maxPermits;
}
Logger.getLogger(MemorySegmentIndexInputProvider.class.getName())
.warning(
"Invalid value for sysprop "
+ MMapDirectory.SHARED_ARENA_MAX_PERMITS_SYSPROP
+ ", must be positive and <= 0x07FF. The default value will be used.");
return RefCountedSharedArena.DEFAULT_MAX_PERMITS;
}
/**
* Gets an arena for the given group, potentially aggregating files from the same segment into a
* single ref counted shared arena. A ref counted shared arena, if created will be added to the
* given arenas map.
*/
private Arena getSharedArena(
Optional<String> group, ConcurrentHashMap<String, RefCountedSharedArena> arenas) {
if (group.isEmpty()) {
return Arena.ofShared();
}
String key = group.get();
var refCountedArena =
arenas.computeIfAbsent(
key, s -> new RefCountedSharedArena(s, () -> arenas.remove(s), sharedArenaMaxPermits));
if (refCountedArena.acquire()) {
return refCountedArena;
} else {
return arenas.compute(
key,
(s, v) -> {
if (v != null && v.acquire()) {
return v;
} else {
v = new RefCountedSharedArena(s, () -> arenas.remove(s), sharedArenaMaxPermits);
v.acquire(); // guaranteed to succeed
return v;
}
});
}
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.store;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A reference counted shared Arena.
*
* <p>The purpose of this class is to allow a number of mmapped memory segments to be associated
* with a single underlying arena in order to avoid closing the underlying arena until all segments
* are closed. Typically, these memory segments belong to the same logical group, e.g. individual
* files of the same index segment. We do this to avoid the expensive cost of closing a shared
* Arena.
*
* <p>The reference count is increased by {@link #acquire()}, and decreased by {@link #release()}.
* When the reference count reaches 0, then the underlying arena is closed and the given {@code
* onClose} runnable is executed. No more references can be acquired.
*
* <p>The total number of acquires that can be obtained for the lifetime of an instance of this
* class is 1024. When the total number of acquires is exhausted, then no more acquires are
* permitted and {@link #acquire()} returns false. This is independent of the actual number of the
* ref count.
*/
@SuppressWarnings("preview")
final class RefCountedSharedArena implements Arena {
// default maximum permits
static final int DEFAULT_MAX_PERMITS = 1024;
private static final int CLOSED = 0;
// minimum value, beyond which permits are exhausted
private static final int REMAINING_UNIT = 1 << 16;
// acquire decrement; effectively decrements permits and increments ref count
private static final int ACQUIRE_DECREMENT = REMAINING_UNIT - 1; // 0xffff
private final String segmentName;
private final Runnable onClose;
private final Arena arena;
// high 16 bits contain the total remaining acquires; monotonically decreasing
// low 16 bit contain the current ref count
private final AtomicInteger state;
RefCountedSharedArena(String segmentName, Runnable onClose) {
this(segmentName, onClose, DEFAULT_MAX_PERMITS);
}
RefCountedSharedArena(String segmentName, Runnable onClose, int maxPermits) {
if (validMaxPermits(maxPermits) == false) {
throw new IllegalArgumentException("invalid max permits: " + maxPermits);
}
this.segmentName = segmentName;
this.onClose = onClose;
this.arena = Arena.ofShared();
this.state = new AtomicInteger(maxPermits << 16);
}
static boolean validMaxPermits(int v) {
return v > 0 && v <= 0x7FFF;
}
// for debugging
String getSegmentName() {
return segmentName;
}
/**
* Returns true if the ref count has been increased. Otherwise, false if there are no remaining
* acquires.
*/
boolean acquire() {
int value;
while (true) {
value = state.get();
if (value < REMAINING_UNIT) {
return false;
}
if (this.state.compareAndSet(value, value - ACQUIRE_DECREMENT)) {
return true;
}
}
}
/** Decrements the ref count. */
void release() {
int value;
while (true) {
value = state.get();
final int count = value & 0xFFFF;
if (count == 0) {
throw new IllegalStateException(value == CLOSED ? "closed" : "nothing to release");
}
final int newValue = count == 1 ? CLOSED : value - 1;
if (this.state.compareAndSet(value, newValue)) {
if (newValue == CLOSED) {
onClose.run();
arena.close();
}
return;
}
}
}
@Override
public void close() {
release();
}
@Override
public MemorySegment allocate(long byteSize, long byteAlignment) {
throw new UnsupportedOperationException();
}
@Override
public MemorySegment.Scope scope() {
return arena.scope();
}
@Override
public String toString() {
return "RefCountedArena[segmentName="
+ segmentName
+ ", value="
+ state.get()
+ ", arena="
+ arena
+ "]";
}
}

View File

@ -16,14 +16,22 @@
*/
package org.apache.lucene.store;
import static java.util.stream.Collectors.toList;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.lucene.tests.store.BaseDirectoryTestCase;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.NamedThreadFactory;
@ -172,4 +180,153 @@ public class TestMMapDirectory extends BaseDirectoryTestCase {
throw ee.getCause();
}
}
public void testArenas() throws Exception {
Supplier<String> randomGenerationOrNone =
() -> random().nextBoolean() ? "_" + random().nextInt(5) : "";
// First, create a number of segment specific file name lists to test with
var exts =
List.of(
".si", ".cfs", ".cfe", ".dvd", ".dvm", ".nvd", ".nvm", ".fdt", ".vec", ".vex", ".vemf");
var names =
IntStream.range(0, 50)
.mapToObj(i -> "_" + i + randomGenerationOrNone.get())
.flatMap(s -> exts.stream().map(ext -> s + ext))
.collect(toList());
// Second, create a number of non-segment file names
IntStream.range(0, 50).mapToObj(i -> "foo" + i).forEach(names::add);
Collections.shuffle(names, random());
final int size = 6;
byte[] bytes = new byte[size];
random().nextBytes(bytes);
try (var dir = new MMapDirectory(createTempDir("testArenas"))) {
for (var name : names) {
try (IndexOutput out = dir.createOutput(name, IOContext.DEFAULT)) {
out.writeBytes(bytes, 0, bytes.length);
}
}
int nThreads = 10;
int perListSize = (names.size() + nThreads) / nThreads;
List<List<String>> nameLists =
IntStream.range(0, nThreads)
.mapToObj(
i ->
names.subList(
perListSize * i, Math.min(perListSize * i + perListSize, names.size())))
.toList();
var threadFactory = new NamedThreadFactory("testArenas");
try (var executor = Executors.newFixedThreadPool(nThreads, threadFactory)) {
var tasks = nameLists.stream().map(l -> new IndicesOpenTask(l, dir)).toList();
var futures = tasks.stream().map(executor::submit).toList();
for (var future : futures) {
future.get();
}
}
if (!(dir.attachment instanceof ConcurrentHashMap<?, ?> map)) {
throw new AssertionError("unexpected attachment: " + dir.attachment);
}
assertEquals(0, map.size());
}
}
static class IndicesOpenTask implements Callable<Void> {
final List<String> names;
final Directory dir;
IndicesOpenTask(List<String> names, Directory dir) {
this.names = names;
this.dir = dir;
}
@Override
public Void call() throws Exception {
List<IndexInput> closeables = new ArrayList<>();
for (var name : names) {
closeables.add(dir.openInput(name, IOContext.DEFAULT));
}
for (IndexInput closeable : closeables) {
closeable.close();
}
return null;
}
}
// Opens more files in the same group than the ref counting limit.
public void testArenasManySegmentFiles() throws Exception {
var names = IntStream.range(0, 1024).mapToObj(i -> "_001.ext" + i).toList();
final int size = 4;
byte[] bytes = new byte[size];
random().nextBytes(bytes);
try (var dir = new MMapDirectory(createTempDir("testArenasManySegmentFiles"))) {
for (var name : names) {
try (IndexOutput out = dir.createOutput(name, IOContext.DEFAULT)) {
out.writeBytes(bytes, 0, bytes.length);
}
}
List<IndexInput> closeables = new ArrayList<>();
for (var name : names) {
closeables.add(dir.openInput(name, IOContext.DEFAULT));
}
for (IndexInput closeable : closeables) {
closeable.close();
}
if (!(dir.attachment instanceof ConcurrentHashMap<?, ?> map)) {
throw new AssertionError("unexpected attachment: " + dir.attachment);
}
assertEquals(0, map.size());
}
}
public void testGroupBySegmentFunc() {
var func = MMapDirectory.GROUP_BY_SEGMENT;
assertEquals("0", func.apply("_0.doc").orElseThrow());
assertEquals("51", func.apply("_51.si").orElseThrow());
assertEquals("51-g", func.apply("_51_1.si").orElseThrow());
assertEquals("51-g", func.apply("_51_1_gg_ff.si").orElseThrow());
assertEquals("51-g", func.apply("_51_2_gg_ff.si").orElseThrow());
assertEquals("51-g", func.apply("_51_3_gg_ff.si").orElseThrow());
assertEquals("5987654321", func.apply("_5987654321.si").orElseThrow());
assertEquals("f", func.apply("_f.si").orElseThrow());
assertEquals("ff", func.apply("_ff.si").orElseThrow());
assertEquals("51a", func.apply("_51a.si").orElseThrow());
assertEquals("f51a", func.apply("_f51a.si").orElseThrow());
assertEquals("segment", func.apply("_segment.si").orElseThrow());
// old style
assertEquals("5", func.apply("_5_Lucene90FieldsIndex-doc_ids_0.tmp").orElseThrow());
assertFalse(func.apply("").isPresent());
assertFalse(func.apply("_").isPresent());
assertFalse(func.apply("_.si").isPresent());
assertFalse(func.apply("foo").isPresent());
assertFalse(func.apply("_foo").isPresent());
assertFalse(func.apply("__foo").isPresent());
assertFalse(func.apply("_segment").isPresent());
assertFalse(func.apply("segment.si").isPresent());
}
public void testNoGroupingFunc() {
var func = MMapDirectory.NO_GROUPING;
assertFalse(func.apply("_0.doc").isPresent());
assertFalse(func.apply("_0.si").isPresent());
assertFalse(func.apply("_54.si").isPresent());
assertFalse(func.apply("_ff.si").isPresent());
assertFalse(func.apply("_.si").isPresent());
assertFalse(func.apply("foo").isPresent());
assertFalse(func.apply("_foo").isPresent());
assertFalse(func.apply("__foo").isPresent());
assertFalse(func.apply("_segment").isPresent());
assertFalse(func.apply("_segment.si").isPresent());
assertFalse(func.apply("segment.si").isPresent());
assertFalse(func.apply("_51a.si").isPresent());
}
}