Add test that the default codec parallelizes I/O. (#13579)

This adds a Directory wrapper that counts how many times we wait for I/O to
complete before doing something else, and adds tests that the default codec is
able to parallelize I/O for stored fields retrieval and term lookups.
This commit is contained in:
Adrien Grand 2024-07-31 17:11:44 +02:00 committed by GitHub
parent 30c965ea57
commit 5226e282b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 415 additions and 0 deletions

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.store.SerialIOCountingDirectory;
import org.apache.lucene.tests.util.LineFileDocs;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOBooleanSupplier;
import org.apache.lucene.util.IOUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
public class TestDefaultCodecParallelizesIO extends LuceneTestCase {
private static SerialIOCountingDirectory dir;
private static IndexReader reader;
@BeforeClass
public static void beforeClass() throws Exception {
Directory bbDir = new ByteBuffersDirectory();
try (LineFileDocs docs = new LineFileDocs(random());
IndexWriter w =
new IndexWriter(bbDir, new IndexWriterConfig().setCodec(TestUtil.getDefaultCodec()))) {
final int numDocs = atLeast(10_000);
for (int d = 0; d < numDocs; ++d) {
Document doc = docs.nextDoc();
w.addDocument(doc);
}
w.forceMerge(1);
}
dir = new SerialIOCountingDirectory(bbDir);
reader = DirectoryReader.open(dir);
}
@AfterClass
public static void afterClass() throws Exception {
IOUtils.close(reader, dir);
}
/** Simulate term lookup in a BooleanQuery. */
public void testTermsSeekExact() throws IOException {
long prevCount = dir.count();
Terms terms = getOnlyLeafReader(reader).terms("body");
String[] termValues = new String[] {"a", "which", "the", "for", "he"};
IOBooleanSupplier[] suppliers = new IOBooleanSupplier[termValues.length];
for (int i = 0; i < termValues.length; ++i) {
TermsEnum te = terms.iterator();
suppliers[i] = te.prepareSeekExact(new BytesRef(termValues[i]));
}
int nonNullIOSuppliers = 0;
for (IOBooleanSupplier supplier : suppliers) {
if (supplier != null) {
nonNullIOSuppliers++;
supplier.get();
}
}
assertTrue(nonNullIOSuppliers > 0);
long newCount = dir.count();
assertTrue(newCount - prevCount > 0);
assertTrue(newCount - prevCount < nonNullIOSuppliers);
}
/** Simulate stored fields retrieval. */
public void testStoredFields() throws IOException {
long prevCount = dir.count();
LeafReader leafReader = getOnlyLeafReader(reader);
StoredFields storedFields = leafReader.storedFields();
int[] docs = new int[20];
for (int i = 0; i < docs.length; ++i) {
docs[i] = random().nextInt(leafReader.maxDoc());
storedFields.prefetch(docs[i]);
}
for (int doc : docs) {
storedFields.document(doc);
}
long newCount = dir.count();
assertTrue(newCount - prevCount > 0);
assertTrue(newCount - prevCount < docs.length);
}
}

View File

@ -0,0 +1,210 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.tests.store;
import java.io.IOException;
import java.util.concurrent.atomic.LongAdder;
import org.apache.lucene.internal.hppc.LongHashSet;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.ReadAdvice;
import org.apache.lucene.util.CloseableThreadLocal;
/**
* A {@link Directory} wrapper that counts the number of times that Lucene may wait for I/O to
* return serially. Lower counts mean that Lucene better takes advantage of I/O parallelism.
*/
public class SerialIOCountingDirectory extends FilterDirectory {
private static final long PAGE_SHIFT = 12; // 4096 bytes per page
// Assumed number of pages that are read ahead
private static final int PAGE_READAHEAD = 4;
private final LongAdder counter = new LongAdder();
private final CloseableThreadLocal<Boolean> pendingFetch =
new CloseableThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.FALSE;
}
};
/** Sole constructor. */
public SerialIOCountingDirectory(Directory in) {
super(in);
}
@Override
public void close() throws IOException {
pendingFetch.close();
super.close();
}
/** Return the number of I/O request performed serially. */
public long count() {
return counter.sum();
}
@Override
public ChecksumIndexInput openChecksumInput(String name) throws IOException {
// sequential access, count 1 for the whole file
counter.increment();
return super.openChecksumInput(name);
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
if (context.readAdvice() == ReadAdvice.RANDOM_PRELOAD) {
// expected to be loaded in memory, only count 1 at open time
counter.increment();
return super.openInput(name, context);
}
return new SerializedIOCountingIndexInput(super.openInput(name, context), context.readAdvice());
}
private class SerializedIOCountingIndexInput extends IndexInput {
private final IndexInput in;
private final long sliceOffset, sliceLength;
private final ReadAdvice readAdvice;
private final LongHashSet pendingPages = new LongHashSet();
private long currentPage = Long.MIN_VALUE;
public SerializedIOCountingIndexInput(IndexInput in, ReadAdvice readAdvice) {
this(in, readAdvice, 0L, in.length());
}
public SerializedIOCountingIndexInput(
IndexInput in, ReadAdvice readAdvice, long offset, long length) {
super(in.toString());
this.in = in;
this.sliceOffset = offset;
this.sliceLength = length;
this.readAdvice = readAdvice;
}
private void onRead(long offset, int len) {
if (len == 0) {
return;
}
final long firstPage = (sliceOffset + offset) >> PAGE_SHIFT;
final long lastPage = (sliceOffset + offset + len - 1) >> PAGE_SHIFT;
for (long page = firstPage; page <= lastPage; ++page) {
long readAheadUpto;
if (readAdvice == ReadAdvice.RANDOM) {
readAheadUpto = currentPage;
} else {
// Assume that the next few pages are always free to read thanks to read-ahead.
readAheadUpto = currentPage + PAGE_READAHEAD;
}
if (pendingPages.contains(page) == false && (page < currentPage || page > readAheadUpto)) {
counter.increment();
}
currentPage = page;
}
pendingFetch.set(false);
}
@Override
public void prefetch(long offset, long length) throws IOException {
final long firstPage = (sliceOffset + offset) >> PAGE_SHIFT;
final long lastPage = (sliceOffset + offset + length - 1) >> PAGE_SHIFT;
long readAheadUpto;
if (readAdvice == ReadAdvice.RANDOM) {
readAheadUpto = currentPage;
} else {
// Assume that the next few pages are always free to read thanks to read-ahead.
readAheadUpto = currentPage + PAGE_READAHEAD;
}
if (firstPage >= currentPage && lastPage <= readAheadUpto) {
// seeking within the current (or next page if ReadAdvice.NORMAL) doesn't increment the
// counter
} else if (pendingFetch.get() == false) {
// If multiple prefetch calls are performed without a readXXX() call in-between, count a
// single increment as these I/O requests can be performed in parallel.
counter.increment();
pendingPages.clear();
pendingFetch.set(true);
}
for (long page = firstPage; page <= lastPage; ++page) {
pendingPages.add(page);
}
}
@Override
public byte readByte() throws IOException {
onRead(getFilePointer(), Byte.BYTES);
return in.readByte();
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
onRead(getFilePointer(), len);
in.readBytes(b, offset, len);
}
@Override
public void close() throws IOException {
in.close();
}
@Override
public long getFilePointer() {
return in.getFilePointer() - sliceOffset;
}
@Override
public void seek(long pos) throws IOException {
in.seek(sliceOffset + pos);
}
@Override
public long length() {
return sliceLength;
}
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
return slice(sliceDescription, offset, length, readAdvice);
}
@Override
public IndexInput slice(
String sliceDescription, long offset, long length, ReadAdvice readAdvice)
throws IOException {
if (offset < 0 || offset + length > sliceLength) {
throw new IllegalArgumentException();
}
IndexInput clone = in.clone();
clone.seek(sliceOffset + offset);
return new SerializedIOCountingIndexInput(clone, readAdvice, sliceOffset + offset, length);
}
@Override
public IndexInput clone() {
IndexInput clone = in.clone();
return new SerializedIOCountingIndexInput(clone, readAdvice, sliceOffset, sliceLength);
}
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.tests.store;
import java.io.IOException;
import java.nio.file.Path;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.ReadAdvice;
public class TestSerializedIOCountingDirectory extends BaseDirectoryTestCase {
@Override
protected Directory getDirectory(Path path) throws IOException {
return new SerialIOCountingDirectory(FSDirectory.open(path));
}
public void testSequentialReads() throws IOException {
try (SerialIOCountingDirectory dir = new SerialIOCountingDirectory(newDirectory())) {
try (IndexOutput out = dir.createOutput("test", IOContext.DEFAULT)) {
for (int i = 0; i < 10; ++i) {
out.writeBytes(new byte[4096], 4096);
}
}
try (IndexInput in =
dir.openInput("test", IOContext.DEFAULT.withReadAdvice(ReadAdvice.NORMAL))) {
in.readByte();
long count = dir.count();
while (in.getFilePointer() < in.length()) {
in.readByte();
}
// Sequential reads are free with the normal advice
assertEquals(count, dir.count());
}
try (IndexInput in =
dir.openInput("test", IOContext.DEFAULT.withReadAdvice(ReadAdvice.RANDOM))) {
in.readByte();
long count = dir.count();
while (in.getFilePointer() < in.length()) {
in.readByte();
}
// But not with the random advice
assertFalse(count == dir.count());
}
}
}
public void testParallelReads() throws IOException {
try (SerialIOCountingDirectory dir = new SerialIOCountingDirectory(newDirectory())) {
try (IndexOutput out = dir.createOutput("test", IOContext.DEFAULT)) {
for (int i = 0; i < 10; ++i) {
out.writeBytes(new byte[4096], 4096);
}
}
try (IndexInput in =
dir.openInput("test", IOContext.DEFAULT.withReadAdvice(ReadAdvice.RANDOM))) {
long count = dir.count();
// count is incremented on the first prefetch
in.prefetch(5_000, 1);
assertEquals(count + 1, dir.count());
count = dir.count();
// but not on the second one since it can be performed in parallel
in.prefetch(10_000, 1);
assertEquals(count, dir.count());
// and reading from a prefetched page doesn't increment the counter
in.seek(5_000);
in.readByte();
assertEquals(count, dir.count());
in.seek(10_000);
in.readByte();
assertEquals(count, dir.count());
// reading data on a page that was not prefetched increments the counter
in.seek(15_000);
in.readByte();
assertEquals(count + 1, dir.count());
}
}
}
}