Improve robustness of Query Result serializations (#54692) (#55028)

Makes query result serialization more robust by propagating possible
IOExceptions that can occur during shard level result serialization to the
caller instead of throwing AssertionError that is not intercepted.

Fixes #54665
This commit is contained in:
Igor Motov 2020-04-10 10:29:01 -04:00 committed by GitHub
parent 17101d86d9
commit da976d247f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 21 additions and 15 deletions

View File

@ -808,7 +808,7 @@ public abstract class StreamOutput extends OutputStream {
if (writer != null) {
writer.write(this, value);
} else {
throw new IOException("can not write type [" + type + "]");
throw new IllegalArgumentException("can not write type [" + type + "]");
}
}

View File

@ -28,6 +28,7 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
@ -43,6 +44,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@ -115,8 +117,8 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
// NORELEASE The cacheKeyRenderer has been added in order to debug
// https://github.com/elastic/elasticsearch/issues/32827, it should be
// removed when this issue is solved
BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> loader,
DirectoryReader reader, BytesReference cacheKey, Supplier<String> cacheKeyRenderer) throws Exception {
BytesReference getOrCompute(CacheEntity cacheEntity, CheckedSupplier<BytesReference, IOException> loader,
DirectoryReader reader, BytesReference cacheKey, Supplier<String> cacheKeyRenderer) throws Exception {
assert reader.getReaderCacheHelper() != null;
final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey);
Loader cacheLoader = new Loader(cacheEntity, loader);
@ -157,10 +159,10 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
private static class Loader implements CacheLoader<Key, BytesReference> {
private final CacheEntity entity;
private final Supplier<BytesReference> loader;
private final CheckedSupplier<BytesReference, IOException> loader;
private boolean loaded;
Loader(CacheEntity entity, Supplier<BytesReference> loader) {
Loader(CacheEntity entity, CheckedSupplier<BytesReference, IOException> loader) {
this.entity = entity;
this.loader = loader;
}

View File

@ -45,7 +45,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
@ -1376,12 +1378,7 @@ public class IndicesService extends AbstractLifecycleComponent
() -> "Shard: " + request.shardId() + "\nSource:\n" + request.source(),
out -> {
queryPhase.execute(context);
try {
context.queryResult().writeToNoId(out);
} catch (IOException e) {
throw new AssertionError("Could not serialize response", e);
}
context.queryResult().writeToNoId(out);
loadedFromCache[0] = false;
});
@ -1420,9 +1417,9 @@ public class IndicesService extends AbstractLifecycleComponent
* @return the contents of the cache or the result of calling the loader
*/
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey,
Supplier<String> cacheKeyRenderer, Consumer<StreamOutput> loader) throws Exception {
Supplier<String> cacheKeyRenderer, CheckedConsumer<StreamOutput, IOException> loader) throws Exception {
IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard);
Supplier<BytesReference> supplier = () -> {
CheckedSupplier<BytesReference, IOException> supplier = () -> {
/* BytesStreamOutput allows to pass the expected size but by default uses
* BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie.
* a date histogram with 3 buckets is ~100byte so 16k might be very wasteful

View File

@ -349,6 +349,13 @@ public class BytesStreamsTests extends ESTestCase {
assertThat(jdt.getZonedDateTime().toInstant().toEpochMilli(), equalTo(123456L));
assertThat(jdt.getZonedDateTime().getZone(), equalTo(ZoneId.of("America/Los_Angeles")));
assertEquals(0, in.available());
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> out.writeGenericValue(new Object() {
@Override
public String toString() {
return "This object cannot be serialized by writeGeneric method";
}
}));
assertThat(ex.getMessage(), containsString("can not write type"));
in.close();
out.close();
}

View File

@ -32,6 +32,7 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.bytes.AbstractBytesReference;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -49,7 +50,6 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class IndicesRequestCacheTests extends ESTestCase {
@ -331,7 +331,7 @@ public class IndicesRequestCacheTests extends ESTestCase {
StringField.TYPE_STORED));
}
private static class Loader implements Supplier<BytesReference> {
private static class Loader implements CheckedSupplier<BytesReference, IOException> {
private final DirectoryReader reader;
private final int id;