[FIELDDATA] Use KeyedLock in IndexFieldDataService
Today we synchronize when updating the IndexFieldDataService datastructures. This might unnecessarily block progress if multiple request need different fielddata instance for different fields. This commit also fixes clear calls to actually consistently clear the caches in the case of an exception. Closes #6855
This commit is contained in:
parent
e30176cc69
commit
90ea4610c8
|
@ -130,17 +130,37 @@ public final class ExceptionsHelper {
|
||||||
public static <T extends Throwable> void rethrowAndSuppress(List<T> exceptions) throws T {
|
public static <T extends Throwable> void rethrowAndSuppress(List<T> exceptions) throws T {
|
||||||
T main = null;
|
T main = null;
|
||||||
for (T ex : exceptions) {
|
for (T ex : exceptions) {
|
||||||
if (main == null) {
|
main = useOrSupress(main, ex);
|
||||||
main = ex;
|
|
||||||
} else {
|
|
||||||
main.addSuppressed(ex);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (main != null) {
|
if (main != null) {
|
||||||
throw main;
|
throw main;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Throws a runtime exception with all given exceptions added as suppressed.
|
||||||
|
* If the given list is empty no exception is thrown
|
||||||
|
*/
|
||||||
|
public static <T extends Throwable> void maybeThrowRuntimeAndSuppress(List<T> exceptions) {
|
||||||
|
T main = null;
|
||||||
|
for (T ex : exceptions) {
|
||||||
|
main = useOrSupress(main, ex);
|
||||||
|
}
|
||||||
|
if (main != null) {
|
||||||
|
throw new ElasticsearchException(main.getMessage(), main);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T extends Throwable> T useOrSupress(T first, T second) {
|
||||||
|
if (first == null) {
|
||||||
|
return second;
|
||||||
|
} else {
|
||||||
|
first.addSuppressed(second);
|
||||||
|
}
|
||||||
|
return first;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static <T extends Throwable> T unwrap(Throwable t, Class<T> clazz) {
|
public static <T extends Throwable> T unwrap(Throwable t, Class<T> clazz) {
|
||||||
if (t != null) {
|
if (t != null) {
|
||||||
do {
|
do {
|
||||||
|
|
|
@ -22,12 +22,14 @@ package org.elasticsearch.index.fielddata;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||||
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.common.collect.MapBuilder;
|
import org.elasticsearch.common.collect.MapBuilder;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
|
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||||
import org.elasticsearch.index.AbstractIndexComponent;
|
import org.elasticsearch.index.AbstractIndexComponent;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsBuilder;
|
import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsBuilder;
|
||||||
|
@ -43,6 +45,9 @@ import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
|
||||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
|
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
@ -133,6 +138,7 @@ public class IndexFieldDataService extends AbstractIndexComponent {
|
||||||
|
|
||||||
private final IndicesFieldDataCache indicesFieldDataCache;
|
private final IndicesFieldDataCache indicesFieldDataCache;
|
||||||
private final ConcurrentMap<String, IndexFieldData<?>> loadedFieldData = ConcurrentCollections.newConcurrentMap();
|
private final ConcurrentMap<String, IndexFieldData<?>> loadedFieldData = ConcurrentCollections.newConcurrentMap();
|
||||||
|
private final KeyedLock.GlobalLockable<String> fieldLoadingLock = new KeyedLock.GlobalLockable<>();
|
||||||
private final Map<String, IndexFieldDataCache> fieldDataCaches = Maps.newHashMap(); // no need for concurrency support, always used under lock
|
private final Map<String, IndexFieldDataCache> fieldDataCaches = Maps.newHashMap(); // no need for concurrency support, always used under lock
|
||||||
|
|
||||||
IndexService indexService;
|
IndexService indexService;
|
||||||
|
@ -162,36 +168,67 @@ public class IndexFieldDataService extends AbstractIndexComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear() {
|
public void clear() {
|
||||||
synchronized (loadedFieldData) {
|
fieldLoadingLock.globalLock().lock();
|
||||||
for (IndexFieldData<?> fieldData : loadedFieldData.values()) {
|
try {
|
||||||
|
List<Throwable> exceptions = new ArrayList<>(0);
|
||||||
|
final Collection<IndexFieldData<?>> fieldDataValues = loadedFieldData.values();
|
||||||
|
for (IndexFieldData<?> fieldData : fieldDataValues) {
|
||||||
|
try {
|
||||||
fieldData.clear();
|
fieldData.clear();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
exceptions.add(t);
|
||||||
}
|
}
|
||||||
loadedFieldData.clear();
|
}
|
||||||
for (IndexFieldDataCache cache : fieldDataCaches.values()) {
|
fieldDataValues.clear();
|
||||||
|
final Collection<IndexFieldDataCache> fieldDataCacheValues = fieldDataCaches.values();
|
||||||
|
for (IndexFieldDataCache cache : fieldDataCacheValues) {
|
||||||
|
try {
|
||||||
cache.clear();
|
cache.clear();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
exceptions.add(t);
|
||||||
}
|
}
|
||||||
fieldDataCaches.clear();
|
}
|
||||||
|
fieldDataCacheValues.clear();
|
||||||
|
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
|
||||||
|
} finally {
|
||||||
|
fieldLoadingLock.globalLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clearField(String fieldName) {
|
public void clearField(final String fieldName) {
|
||||||
synchronized (loadedFieldData) {
|
fieldLoadingLock.acquire(fieldName);
|
||||||
IndexFieldData<?> fieldData = loadedFieldData.remove(fieldName);
|
try {
|
||||||
|
List<Throwable> exceptions = new ArrayList<>(0);
|
||||||
|
final IndexFieldData<?> fieldData = loadedFieldData.remove(fieldName);
|
||||||
if (fieldData != null) {
|
if (fieldData != null) {
|
||||||
|
try {
|
||||||
fieldData.clear();
|
fieldData.clear();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
exceptions.add(t);
|
||||||
}
|
}
|
||||||
IndexFieldDataCache cache = fieldDataCaches.remove(fieldName);
|
}
|
||||||
|
final IndexFieldDataCache cache = fieldDataCaches.remove(fieldName);
|
||||||
if (cache != null) {
|
if (cache != null) {
|
||||||
|
try {
|
||||||
cache.clear();
|
cache.clear();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
exceptions.add(t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
|
||||||
|
} finally {
|
||||||
|
fieldLoadingLock.release(fieldName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onMappingUpdate() {
|
public void onMappingUpdate() {
|
||||||
// synchronize to make sure to not miss field data instances that are being loaded
|
// synchronize to make sure to not miss field data instances that are being loaded
|
||||||
synchronized (loadedFieldData) {
|
fieldLoadingLock.globalLock().lock();
|
||||||
|
try {
|
||||||
// important: do not clear fieldDataCaches: the cache may be reused
|
// important: do not clear fieldDataCaches: the cache may be reused
|
||||||
loadedFieldData.clear();
|
loadedFieldData.clear();
|
||||||
|
} finally {
|
||||||
|
fieldLoadingLock.globalLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -203,10 +240,12 @@ public class IndexFieldDataService extends AbstractIndexComponent {
|
||||||
throw new ElasticsearchIllegalArgumentException("found no fielddata type for field [" + fieldNames.fullName() + "]");
|
throw new ElasticsearchIllegalArgumentException("found no fielddata type for field [" + fieldNames.fullName() + "]");
|
||||||
}
|
}
|
||||||
final boolean docValues = mapper.hasDocValues();
|
final boolean docValues = mapper.hasDocValues();
|
||||||
IndexFieldData<?> fieldData = loadedFieldData.get(fieldNames.indexName());
|
final String key = fieldNames.indexName();
|
||||||
|
IndexFieldData<?> fieldData = loadedFieldData.get(key);
|
||||||
if (fieldData == null) {
|
if (fieldData == null) {
|
||||||
synchronized (loadedFieldData) {
|
fieldLoadingLock.acquire(key);
|
||||||
fieldData = loadedFieldData.get(fieldNames.indexName());
|
try {
|
||||||
|
fieldData = loadedFieldData.get(key);
|
||||||
if (fieldData == null) {
|
if (fieldData == null) {
|
||||||
IndexFieldData.Builder builder = null;
|
IndexFieldData.Builder builder = null;
|
||||||
String format = type.getFormat(indexSettings);
|
String format = type.getFormat(indexSettings);
|
||||||
|
@ -251,8 +290,10 @@ public class IndexFieldDataService extends AbstractIndexComponent {
|
||||||
|
|
||||||
GlobalOrdinalsBuilder globalOrdinalBuilder = new InternalGlobalOrdinalsBuilder(index(), indexSettings);
|
GlobalOrdinalsBuilder globalOrdinalBuilder = new InternalGlobalOrdinalsBuilder(index(), indexSettings);
|
||||||
fieldData = builder.build(index, indexSettings, mapper, cache, circuitBreakerService, indexService.mapperService(), globalOrdinalBuilder);
|
fieldData = builder.build(index, indexSettings, mapper, cache, circuitBreakerService, indexService.mapperService(), globalOrdinalBuilder);
|
||||||
loadedFieldData.put(fieldNames.indexName(), fieldData);
|
loadedFieldData.put(key, fieldData);
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
fieldLoadingLock.release(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return (IFD) fieldData;
|
return (IFD) fieldData;
|
||||||
|
|
Loading…
Reference in New Issue