arenas) {
+ if (group.isEmpty()) {
+ return Arena.ofShared();
+ }
+
+ String key = group.get();
+ var refCountedArena =
+ arenas.computeIfAbsent(
+ key, s -> new RefCountedSharedArena(s, () -> arenas.remove(s), sharedArenaMaxPermits));
+ if (refCountedArena.acquire()) {
+ return refCountedArena;
+ } else {
+ return arenas.compute(
+ key,
+ (s, v) -> {
+ if (v != null && v.acquire()) {
+ return v;
+ } else {
+ v = new RefCountedSharedArena(s, () -> arenas.remove(s), sharedArenaMaxPermits);
+ v.acquire(); // guaranteed to succeed
+ return v;
+ }
+ });
+ }
+ }
}
diff --git a/lucene/core/src/java21/org/apache/lucene/store/RefCountedSharedArena.java b/lucene/core/src/java21/org/apache/lucene/store/RefCountedSharedArena.java
new file mode 100644
index 00000000000..e89de280a3c
--- /dev/null
+++ b/lucene/core/src/java21/org/apache/lucene/store/RefCountedSharedArena.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.lang.foreign.Arena;
+import java.lang.foreign.MemorySegment;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A reference counted shared Arena.
+ *
+ * The purpose of this class is to allow a number of mmapped memory segments to be associated
+ * with a single underlying arena in order to avoid closing the underlying arena until all segments
+ * are closed. Typically, these memory segments belong to the same logical group, e.g. individual
+ * files of the same index segment. We do this to avoid the expensive cost of closing a shared
+ * Arena.
+ *
+ *
The reference count is increased by {@link #acquire()}, and decreased by {@link #release()}.
+ * When the reference count reaches 0, then the underlying arena is closed and the given {@code
+ * onClose} runnable is executed. No more references can be acquired.
+ *
+ *
The total number of acquires that can be obtained for the lifetime of an instance of this
+ * class is 1024. When the total number of acquires is exhausted, then no more acquires are
+ * permitted and {@link #acquire()} returns false. This is independent of the actual number of the
+ * ref count.
+ */
+@SuppressWarnings("preview")
+final class RefCountedSharedArena implements Arena {
+
+ // default maximum permits
+ static final int DEFAULT_MAX_PERMITS = 1024;
+
+ private static final int CLOSED = 0;
+ // minimum value, beyond which permits are exhausted
+ private static final int REMAINING_UNIT = 1 << 16;
+ // acquire decrement; effectively decrements permits and increments ref count
+ private static final int ACQUIRE_DECREMENT = REMAINING_UNIT - 1; // 0xffff
+
+ private final String segmentName;
+ private final Runnable onClose;
+ private final Arena arena;
+
+ // high 16 bits contain the total remaining acquires; monotonically decreasing
+ // low 16 bit contain the current ref count
+ private final AtomicInteger state;
+
+ RefCountedSharedArena(String segmentName, Runnable onClose) {
+ this(segmentName, onClose, DEFAULT_MAX_PERMITS);
+ }
+
+ RefCountedSharedArena(String segmentName, Runnable onClose, int maxPermits) {
+ if (validMaxPermits(maxPermits) == false) {
+ throw new IllegalArgumentException("invalid max permits: " + maxPermits);
+ }
+ this.segmentName = segmentName;
+ this.onClose = onClose;
+ this.arena = Arena.ofShared();
+ this.state = new AtomicInteger(maxPermits << 16);
+ }
+
+ static boolean validMaxPermits(int v) {
+ return v > 0 && v <= 0x7FFF;
+ }
+
+ // for debugging
+ String getSegmentName() {
+ return segmentName;
+ }
+
+ /**
+ * Returns true if the ref count has been increased. Otherwise, false if there are no remaining
+ * acquires.
+ */
+ boolean acquire() {
+ int value;
+ while (true) {
+ value = state.get();
+ if (value < REMAINING_UNIT) {
+ return false;
+ }
+ if (this.state.compareAndSet(value, value - ACQUIRE_DECREMENT)) {
+ return true;
+ }
+ }
+ }
+
+ /** Decrements the ref count. */
+ void release() {
+ int value;
+ while (true) {
+ value = state.get();
+ final int count = value & 0xFFFF;
+ if (count == 0) {
+ throw new IllegalStateException(value == CLOSED ? "closed" : "nothing to release");
+ }
+ final int newValue = count == 1 ? CLOSED : value - 1;
+ if (this.state.compareAndSet(value, newValue)) {
+ if (newValue == CLOSED) {
+ onClose.run();
+ arena.close();
+ }
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ release();
+ }
+
+ @Override
+ public MemorySegment allocate(long byteSize, long byteAlignment) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public MemorySegment.Scope scope() {
+ return arena.scope();
+ }
+
+ @Override
+ public String toString() {
+ return "RefCountedArena[segmentName="
+ + segmentName
+ + ", value="
+ + state.get()
+ + ", arena="
+ + arena
+ + "]";
+ }
+}
diff --git a/lucene/core/src/test/org/apache/lucene/store/TestMMapDirectory.java b/lucene/core/src/test/org/apache/lucene/store/TestMMapDirectory.java
index f7c49c9b661..d01d6ec50eb 100644
--- a/lucene/core/src/test/org/apache/lucene/store/TestMMapDirectory.java
+++ b/lucene/core/src/test/org/apache/lucene/store/TestMMapDirectory.java
@@ -16,14 +16,22 @@
*/
package org.apache.lucene.store;
+import static java.util.stream.Collectors.toList;
+
import java.io.IOException;
import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
import org.apache.lucene.tests.store.BaseDirectoryTestCase;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.NamedThreadFactory;
@@ -172,4 +180,153 @@ public class TestMMapDirectory extends BaseDirectoryTestCase {
throw ee.getCause();
}
}
+
+ public void testArenas() throws Exception {
+ Supplier randomGenerationOrNone =
+ () -> random().nextBoolean() ? "_" + random().nextInt(5) : "";
+ // First, create a number of segment specific file name lists to test with
+ var exts =
+ List.of(
+ ".si", ".cfs", ".cfe", ".dvd", ".dvm", ".nvd", ".nvm", ".fdt", ".vec", ".vex", ".vemf");
+ var names =
+ IntStream.range(0, 50)
+ .mapToObj(i -> "_" + i + randomGenerationOrNone.get())
+ .flatMap(s -> exts.stream().map(ext -> s + ext))
+ .collect(toList());
+ // Second, create a number of non-segment file names
+ IntStream.range(0, 50).mapToObj(i -> "foo" + i).forEach(names::add);
+ Collections.shuffle(names, random());
+
+ final int size = 6;
+ byte[] bytes = new byte[size];
+ random().nextBytes(bytes);
+
+ try (var dir = new MMapDirectory(createTempDir("testArenas"))) {
+ for (var name : names) {
+ try (IndexOutput out = dir.createOutput(name, IOContext.DEFAULT)) {
+ out.writeBytes(bytes, 0, bytes.length);
+ }
+ }
+
+ int nThreads = 10;
+ int perListSize = (names.size() + nThreads) / nThreads;
+ List> nameLists =
+ IntStream.range(0, nThreads)
+ .mapToObj(
+ i ->
+ names.subList(
+ perListSize * i, Math.min(perListSize * i + perListSize, names.size())))
+ .toList();
+
+ var threadFactory = new NamedThreadFactory("testArenas");
+ try (var executor = Executors.newFixedThreadPool(nThreads, threadFactory)) {
+ var tasks = nameLists.stream().map(l -> new IndicesOpenTask(l, dir)).toList();
+ var futures = tasks.stream().map(executor::submit).toList();
+ for (var future : futures) {
+ future.get();
+ }
+ }
+
+ if (!(dir.attachment instanceof ConcurrentHashMap, ?> map)) {
+ throw new AssertionError("unexpected attachment: " + dir.attachment);
+ }
+ assertEquals(0, map.size());
+ }
+ }
+
+ static class IndicesOpenTask implements Callable {
+ final List names;
+ final Directory dir;
+
+ IndicesOpenTask(List names, Directory dir) {
+ this.names = names;
+ this.dir = dir;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ List closeables = new ArrayList<>();
+ for (var name : names) {
+ closeables.add(dir.openInput(name, IOContext.DEFAULT));
+ }
+ for (IndexInput closeable : closeables) {
+ closeable.close();
+ }
+ return null;
+ }
+ }
+
+ // Opens more files in the same group than the ref counting limit.
+ public void testArenasManySegmentFiles() throws Exception {
+ var names = IntStream.range(0, 1024).mapToObj(i -> "_001.ext" + i).toList();
+
+ final int size = 4;
+ byte[] bytes = new byte[size];
+ random().nextBytes(bytes);
+
+ try (var dir = new MMapDirectory(createTempDir("testArenasManySegmentFiles"))) {
+ for (var name : names) {
+ try (IndexOutput out = dir.createOutput(name, IOContext.DEFAULT)) {
+ out.writeBytes(bytes, 0, bytes.length);
+ }
+ }
+
+ List closeables = new ArrayList<>();
+ for (var name : names) {
+ closeables.add(dir.openInput(name, IOContext.DEFAULT));
+ }
+ for (IndexInput closeable : closeables) {
+ closeable.close();
+ }
+
+ if (!(dir.attachment instanceof ConcurrentHashMap, ?> map)) {
+ throw new AssertionError("unexpected attachment: " + dir.attachment);
+ }
+ assertEquals(0, map.size());
+ }
+ }
+
+ public void testGroupBySegmentFunc() {
+ var func = MMapDirectory.GROUP_BY_SEGMENT;
+ assertEquals("0", func.apply("_0.doc").orElseThrow());
+ assertEquals("51", func.apply("_51.si").orElseThrow());
+ assertEquals("51-g", func.apply("_51_1.si").orElseThrow());
+ assertEquals("51-g", func.apply("_51_1_gg_ff.si").orElseThrow());
+ assertEquals("51-g", func.apply("_51_2_gg_ff.si").orElseThrow());
+ assertEquals("51-g", func.apply("_51_3_gg_ff.si").orElseThrow());
+ assertEquals("5987654321", func.apply("_5987654321.si").orElseThrow());
+ assertEquals("f", func.apply("_f.si").orElseThrow());
+ assertEquals("ff", func.apply("_ff.si").orElseThrow());
+ assertEquals("51a", func.apply("_51a.si").orElseThrow());
+ assertEquals("f51a", func.apply("_f51a.si").orElseThrow());
+ assertEquals("segment", func.apply("_segment.si").orElseThrow());
+
+ // old style
+ assertEquals("5", func.apply("_5_Lucene90FieldsIndex-doc_ids_0.tmp").orElseThrow());
+
+ assertFalse(func.apply("").isPresent());
+ assertFalse(func.apply("_").isPresent());
+ assertFalse(func.apply("_.si").isPresent());
+ assertFalse(func.apply("foo").isPresent());
+ assertFalse(func.apply("_foo").isPresent());
+ assertFalse(func.apply("__foo").isPresent());
+ assertFalse(func.apply("_segment").isPresent());
+ assertFalse(func.apply("segment.si").isPresent());
+ }
+
+ public void testNoGroupingFunc() {
+ var func = MMapDirectory.NO_GROUPING;
+ assertFalse(func.apply("_0.doc").isPresent());
+ assertFalse(func.apply("_0.si").isPresent());
+ assertFalse(func.apply("_54.si").isPresent());
+ assertFalse(func.apply("_ff.si").isPresent());
+ assertFalse(func.apply("_.si").isPresent());
+ assertFalse(func.apply("foo").isPresent());
+ assertFalse(func.apply("_foo").isPresent());
+ assertFalse(func.apply("__foo").isPresent());
+ assertFalse(func.apply("_segment").isPresent());
+ assertFalse(func.apply("_segment.si").isPresent());
+ assertFalse(func.apply("segment.si").isPresent());
+ assertFalse(func.apply("_51a.si").isPresent());
+ }
}