Cache#computeIfAbsent loader can throw checked exceptions
This commit is contained in:
parent
c100d18f86
commit
01e7378804
|
@ -23,6 +23,7 @@ import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
@ -276,11 +277,12 @@ public class Cache<K, V> {
|
||||||
* value using the given mapping function and enters it into this map unless null.
|
* value using the given mapping function and enters it into this map unless null.
|
||||||
*
|
*
|
||||||
* @param key the key whose associated value is to be returned or computed for if non-existant
|
* @param key the key whose associated value is to be returned or computed for if non-existant
|
||||||
* @param mappingFunction the function to compute a value given a key
|
* @param loader the function to compute a value given a key
|
||||||
* @return the current (existing or computed) value associated with the specified key, or null if the computed
|
* @return the current (existing or computed) value associated with the specified key, or null if the computed
|
||||||
* value is null
|
* value is null
|
||||||
|
* @throws ExecutionException thrown if loader throws an exception
|
||||||
*/
|
*/
|
||||||
public V computeIfAbsent(K key, Function<K, V> mappingFunction) {
|
public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionException {
|
||||||
long now = now();
|
long now = now();
|
||||||
V value = get(key, now);
|
V value = get(key, now);
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
|
@ -288,7 +290,11 @@ public class Cache<K, V> {
|
||||||
try (ReleasableLock ignored = segment.writeLock.acquire()) {
|
try (ReleasableLock ignored = segment.writeLock.acquire()) {
|
||||||
value = get(key, now);
|
value = get(key, now);
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
value = mappingFunction.apply(key);
|
try {
|
||||||
|
value = loader.load(key);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ExecutionException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
put(key, value, now);
|
put(key, value, now);
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.common.cache;
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface CacheLoader<K, V> {
|
||||||
|
V load(K key) throws Exception;
|
||||||
|
}
|
|
@ -151,7 +151,6 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
|
||||||
});
|
});
|
||||||
|
|
||||||
return filterToFbs.computeIfAbsent(query, key -> {
|
return filterToFbs.computeIfAbsent(query, key -> {
|
||||||
try {
|
|
||||||
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
|
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
|
||||||
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
|
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
|
||||||
searcher.setQueryCache(null);
|
searcher.setQueryCache(null);
|
||||||
|
@ -167,9 +166,6 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
|
||||||
Value value = new Value(bitSet, shardId);
|
Value value = new Value(bitSet, shardId);
|
||||||
listener.onCache(shardId, value.bitset);
|
listener.onCache(shardId, value.bitset);
|
||||||
return value;
|
return value;
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}).bitset;
|
}).bitset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,10 +29,7 @@ import org.elasticsearch.action.search.SearchType;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.cache.Cache;
|
import org.elasticsearch.common.cache.*;
|
||||||
import org.elasticsearch.common.cache.CacheBuilder;
|
|
||||||
import org.elasticsearch.common.cache.RemovalListener;
|
|
||||||
import org.elasticsearch.common.cache.RemovalNotification;
|
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
|
@ -263,7 +260,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Loader implements Function<Key, Value> {
|
private static class Loader implements CacheLoader<Key, Value> {
|
||||||
|
|
||||||
private final QueryPhase queryPhase;
|
private final QueryPhase queryPhase;
|
||||||
private final SearchContext context;
|
private final SearchContext context;
|
||||||
|
@ -279,7 +276,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Value apply(Key key) {
|
public Value load(Key key) throws Exception {
|
||||||
queryPhase.execute(context);
|
queryPhase.execute(context);
|
||||||
|
|
||||||
/* BytesStreamOutput allows to pass the expected size but by default uses
|
/* BytesStreamOutput allows to pass the expected size but by default uses
|
||||||
|
@ -299,8 +296,6 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis
|
||||||
Value value = new Value(reference, out.ramBytesUsed());
|
Value value = new Value(reference, out.ramBytesUsed());
|
||||||
key.shard.requestCache().onCached(key, value);
|
key.shard.requestCache().onCached(key, value);
|
||||||
return value;
|
return value;
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,12 +152,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||||
for (Listener listener : this.listeners) {
|
for (Listener listener : this.listeners) {
|
||||||
k.listeners.add(listener);
|
k.listeners.add(listener);
|
||||||
}
|
}
|
||||||
final AtomicFieldData fieldData;
|
final AtomicFieldData fieldData = indexFieldData.loadDirect(context);
|
||||||
try {
|
|
||||||
fieldData = indexFieldData.loadDirect(context);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
for (Listener listener : k.listeners) {
|
for (Listener listener : k.listeners) {
|
||||||
try {
|
try {
|
||||||
listener.onCache(shardId, fieldNames, fieldDataType, fieldData);
|
listener.onCache(shardId, fieldNames, fieldDataType, fieldData);
|
||||||
|
@ -181,12 +176,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
||||||
for (Listener listener : this.listeners) {
|
for (Listener listener : this.listeners) {
|
||||||
k.listeners.add(listener);
|
k.listeners.add(listener);
|
||||||
}
|
}
|
||||||
final Accountable ifd;
|
final Accountable ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader);
|
||||||
try {
|
|
||||||
ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
for (Listener listener : k.listeners) {
|
for (Listener listener : k.listeners) {
|
||||||
try {
|
try {
|
||||||
listener.onCache(shardId, fieldNames, fieldDataType, ifd);
|
listener.onCache(shardId, fieldNames, fieldDataType, ifd);
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.junit.Before;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||||
|
@ -448,10 +449,14 @@ public class CacheTests extends ESTestCase {
|
||||||
Thread thread = new Thread(() -> {
|
Thread thread = new Thread(() -> {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
for (int j = 0; j < numberOfEntries; j++) {
|
for (int j = 0; j < numberOfEntries; j++) {
|
||||||
|
try {
|
||||||
cache.computeIfAbsent(j, key -> {
|
cache.computeIfAbsent(j, key -> {
|
||||||
assertTrue(flags.compareAndSet(key, false, true));
|
assertTrue(flags.compareAndSet(key, false, true));
|
||||||
return Integer.toString(key);
|
return Integer.toString(key);
|
||||||
});
|
});
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
threads.add(thread);
|
threads.add(thread);
|
||||||
|
|
Loading…
Reference in New Issue