Unwrap execution exceptions cause and rethrow as is when possible (#12516)

When performing concurrent search, we may get an execution exception
from one or more slices. In that case, we'd like to rethrow the cause of
the execution exception, which we do by wrapping it into a new runtime
exception. Instead, we can rethrow runtime exceptions as-is, and the
same is true for io exceptions. Any other exception is still wrapped
into a new runtime exception. This unifies the exceptions that get
thrown between sequential codepath (when no executor is provided) and
concurrent codepath (when an executor is provided).
This commit is contained in:
Luca Cavanna 2023-09-05 15:55:48 +02:00 committed by GitHub
parent d631615665
commit 947b2c5e5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 115 additions and 20 deletions

View File

@ -186,6 +186,11 @@ Optimizations
* GITHUB##12371: Lazy computation of similarity score during initializeFromGraph (Jack Wang)
Changes in runtime behavior
---------------------
* GITHUB#12516: Unwrap and throw execution exceptions cause when performing concurrent search (Luca Cavanna)
Bug Fixes
---------------------

View File

@ -95,20 +95,17 @@ abstract class AbstractKnnVectorQuery extends Query {
}
private TopDocs[] sequentialSearch(
List<LeafReaderContext> leafReaderContexts, Weight filterWeight) {
try {
TopDocs[] perLeafResults = new TopDocs[leafReaderContexts.size()];
for (LeafReaderContext ctx : leafReaderContexts) {
perLeafResults[ctx.ord] = searchLeaf(ctx, filterWeight);
}
return perLeafResults;
} catch (Exception e) {
throw new RuntimeException(e);
List<LeafReaderContext> leafReaderContexts, Weight filterWeight) throws IOException {
TopDocs[] perLeafResults = new TopDocs[leafReaderContexts.size()];
for (LeafReaderContext ctx : leafReaderContexts) {
perLeafResults[ctx.ord] = searchLeaf(ctx, filterWeight);
}
return perLeafResults;
}
private TopDocs[] parallelSearch(
List<LeafReaderContext> leafReaderContexts, Weight filterWeight, TaskExecutor taskExecutor) {
List<LeafReaderContext> leafReaderContexts, Weight filterWeight, TaskExecutor taskExecutor)
throws IOException {
List<RunnableFuture<TopDocs>> tasks = new ArrayList<>();
for (LeafReaderContext context : leafReaderContexts) {
tasks.add(new FutureTask<>(() -> searchLeaf(context, filterWeight)));

View File

@ -17,6 +17,7 @@
package org.apache.lucene.search;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -25,6 +26,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
/**
@ -46,7 +48,7 @@ class TaskExecutor {
* @return a list containing the results from the tasks execution
* @param <T> the return type of the task execution
*/
final <T> List<T> invokeAll(Collection<RunnableFuture<T>> tasks) {
final <T> List<T> invokeAll(Collection<RunnableFuture<T>> tasks) throws IOException {
for (Runnable task : tasks) {
executor.execute(task);
}
@ -57,7 +59,7 @@ class TaskExecutor {
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
throw IOUtils.rethrowAlways(e.getCause());
}
}
return results;

View File

@ -224,10 +224,7 @@ abstract class BaseKnnVectorQueryTestCase extends LuceneTestCase {
IndexSearcher searcher = newSearcher(reader);
AbstractKnnVectorQuery kvq = getKnnVectorQuery("field", new float[] {0}, 10);
IllegalArgumentException e =
expectThrows(
RuntimeException.class,
IllegalArgumentException.class,
() -> searcher.search(kvq, 10));
expectThrows(IllegalArgumentException.class, () -> searcher.search(kvq, 10));
assertEquals("vector query dimension: 1 differs from field dimension: 2", e.getMessage());
}
}
@ -535,7 +532,6 @@ abstract class BaseKnnVectorQueryTestCase extends LuceneTestCase {
assertEquals(9, results.totalHits.value);
assertEquals(results.totalHits.value, results.scoreDocs.length);
expectThrows(
RuntimeException.class,
UnsupportedOperationException.class,
() ->
searcher.search(
@ -550,7 +546,6 @@ abstract class BaseKnnVectorQueryTestCase extends LuceneTestCase {
assertEquals(5, results.totalHits.value);
assertEquals(results.totalHits.value, results.scoreDocs.length);
expectThrows(
RuntimeException.class,
UnsupportedOperationException.class,
() ->
searcher.search(
@ -578,7 +573,6 @@ abstract class BaseKnnVectorQueryTestCase extends LuceneTestCase {
// Test a filter that exhausts visitedLimit in upper levels, and switches to exact search
Query filter4 = IntPoint.newRangeQuery("tag", lower, lower + 2);
expectThrows(
RuntimeException.class,
UnsupportedOperationException.class,
() ->
searcher.search(
@ -751,7 +745,6 @@ abstract class BaseKnnVectorQueryTestCase extends LuceneTestCase {
Query filter = new ThrowingBitSetQuery(new FixedBitSet(numDocs));
expectThrows(
RuntimeException.class,
UnsupportedOperationException.class,
() ->
searcher.search(

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.search;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.util.NamedThreadFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
public class TestTaskExecutor extends LuceneTestCase {
private static ExecutorService executorService;
@BeforeClass
public static void createExecutor() {
executorService =
Executors.newFixedThreadPool(
1, new NamedThreadFactory(TestTaskExecutor.class.getSimpleName()));
}
@AfterClass
public static void shutdownExecutor() {
executorService.shutdown();
}
public void testUnwrapIOExceptionFromExecutionException() {
TaskExecutor taskExecutor = new TaskExecutor(executorService);
FutureTask<?> task =
new FutureTask<>(
() -> {
throw new IOException("io exception");
});
IOException ioException =
expectThrows(
IOException.class, () -> taskExecutor.invokeAll(Collections.singletonList(task)));
assertEquals("io exception", ioException.getMessage());
}
public void testUnwrapRuntimeExceptionFromExecutionException() {
TaskExecutor taskExecutor = new TaskExecutor(executorService);
FutureTask<?> task =
new FutureTask<>(
() -> {
throw new RuntimeException("runtime");
});
RuntimeException runtimeException =
expectThrows(
RuntimeException.class, () -> taskExecutor.invokeAll(Collections.singletonList(task)));
assertEquals("runtime", runtimeException.getMessage());
assertNull(runtimeException.getCause());
}
public void testUnwrapErrorFromExecutionException() {
TaskExecutor taskExecutor = new TaskExecutor(executorService);
FutureTask<?> task =
new FutureTask<>(
() -> {
throw new OutOfMemoryError("oom");
});
OutOfMemoryError outOfMemoryError =
expectThrows(
OutOfMemoryError.class, () -> taskExecutor.invokeAll(Collections.singletonList(task)));
assertEquals("oom", outOfMemoryError.getMessage());
assertNull(outOfMemoryError.getCause());
}
public void testUnwrappedExceptions() {
TaskExecutor taskExecutor = new TaskExecutor(executorService);
FutureTask<?> task =
new FutureTask<>(
() -> {
throw new Exception("exc");
});
RuntimeException runtimeException =
expectThrows(
RuntimeException.class, () -> taskExecutor.invokeAll(Collections.singletonList(task)));
assertEquals("exc", runtimeException.getCause().getMessage());
}
}