Fix fielddata handling for the `_parent` field.

Change #12371 broke fielddata on the `_parent` child for indices created before
2.0. This pull request adds back caching of the `_parent` fielddata for indices
created before 2.0 and cleans some related stuff. For instance
DocumentTypeListener doesn't need to take care of removed mappings anymore since
mappings can't be removed in 2.0.
This commit is contained in:
Adrien Grand 2015-07-23 14:01:00 +02:00
parent 948da82f90
commit 164d8c1ce2
6 changed files with 94 additions and 200 deletions

View File

@ -22,10 +22,7 @@ package org.elasticsearch.common.util.concurrent;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This class manages locks. Locks can be accessed with an identifier and are
@ -115,58 +112,4 @@ public class KeyedLock<T> {
return !map.isEmpty();
}
/**
* A {@link KeyedLock} that allows to acquire a global lock that guarantees
* exclusive access to the resource the KeyedLock is guarding.
*/
public final static class GlobalLockable<T> extends KeyedLock<T> {
private final ReadWriteLock lock;
public GlobalLockable(boolean fair){
super(fair);
lock = new ReentrantReadWriteLock(fair);
}
public GlobalLockable() {
this(false);
}
@Override
public void acquire(T key) {
boolean success = false;
lock.readLock().lock();
try {
super.acquire(key);
success = true;
} finally {
if (!success) {
lock.readLock().unlock();
}
}
}
@Override
public void release(T key) {
KeyLock keyLock = threadLocal.get();
if (keyLock == null) {
throw new IllegalStateException("Lock not acquired");
}
try {
release(key, keyLock);
} finally {
lock.readLock().unlock();
}
}
/**
* Returns a global lock guaranteeing exclusive access to the resource
* this KeyedLock is guarding.
*/
public Lock globalLock() {
return lock.writeLock();
}
}
}

View File

@ -21,12 +21,13 @@ package org.elasticsearch.index.fielddata;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.plain.*;
@ -136,11 +137,16 @@ public class IndexFieldDataService extends AbstractIndexComponent {
}
private final IndicesFieldDataCache indicesFieldDataCache;
private final KeyedLock.GlobalLockable<String> fieldLoadingLock = new KeyedLock.GlobalLockable<>();
private final Map<String, IndexFieldDataCache> fieldDataCaches = Maps.newHashMap(); // no need for concurrency support, always used under lock
// the below map needs to be modified under a lock
private final Map<String, IndexFieldDataCache> fieldDataCaches = Maps.newHashMap();
IndexService indexService;
// We need to cache fielddata on the _parent field because of 1.x indices.
// When we don't support 1.x anymore (3.0) then remove this caching
// This variable needs to be read/written under lock
private IndexFieldData<?> parentIndexFieldData;
@Inject
public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache,
CircuitBreakerService circuitBreakerService) {
@ -154,9 +160,8 @@ public class IndexFieldDataService extends AbstractIndexComponent {
this.indexService = indexService;
}
public void clear() {
fieldLoadingLock.globalLock().lock();
try {
public synchronized void clear() {
parentIndexFieldData = null;
List<Throwable> exceptions = new ArrayList<>(0);
final Collection<IndexFieldDataCache> fieldDataCacheValues = fieldDataCaches.values();
for (IndexFieldDataCache cache : fieldDataCacheValues) {
@ -168,14 +173,12 @@ public class IndexFieldDataService extends AbstractIndexComponent {
}
fieldDataCacheValues.clear();
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
} finally {
fieldLoadingLock.globalLock().unlock();
}
}
public void clearField(final String fieldName) {
fieldLoadingLock.acquire(fieldName);
try {
public synchronized void clearField(final String fieldName) {
if (ParentFieldMapper.NAME.equals(fieldName)) {
parentIndexFieldData = null;
}
List<Throwable> exceptions = new ArrayList<>(0);
final IndexFieldDataCache cache = fieldDataCaches.remove(fieldName);
if (cache != null) {
@ -186,9 +189,6 @@ public class IndexFieldDataService extends AbstractIndexComponent {
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
} finally {
fieldLoadingLock.release(fieldName);
}
}
@SuppressWarnings("unchecked")
@ -199,9 +199,6 @@ public class IndexFieldDataService extends AbstractIndexComponent {
throw new IllegalArgumentException("found no fielddata type for field [" + fieldNames.fullName() + "]");
}
final boolean docValues = fieldType.hasDocValues();
final String key = fieldNames.indexName();
fieldLoadingLock.acquire(key);
try {
IndexFieldData.Builder builder = null;
String format = type.getFormat(indexSettings);
if (format != null && FieldDataType.DOC_VALUES_FORMAT_VALUE.equals(format) && !docValues) {
@ -224,7 +221,9 @@ public class IndexFieldDataService extends AbstractIndexComponent {
throw new IllegalArgumentException("failed to find field data builder for field " + fieldNames.fullName() + ", and type " + type.getType());
}
IndexFieldDataCache cache = fieldDataCaches.get(fieldNames.indexName());
IndexFieldDataCache cache;
synchronized (this) {
cache = fieldDataCaches.get(fieldNames.indexName());
if (cache == null) {
// we default to node level cache, which in turn defaults to be unbounded
// this means changing the node level settings is simple, just set the bounds there
@ -239,10 +238,18 @@ public class IndexFieldDataService extends AbstractIndexComponent {
fieldDataCaches.put(fieldNames.indexName(), cache);
}
return (IFD) builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService());
} finally {
fieldLoadingLock.release(key);
// Remove this in 3.0
final boolean isOldParentField = ParentFieldMapper.NAME.equals(fieldNames.indexName())
&& Version.indexCreated(indexSettings).before(Version.V_2_0_0_beta1);
if (isOldParentField) {
if (parentIndexFieldData == null) {
parentIndexFieldData = builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService());
}
return (IFD) parentIndexFieldData;
}
}
return (IFD) builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService());
}
}

View File

@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import org.apache.lucene.index.*;
import org.apache.lucene.index.MultiDocValues.OrdinalMap;
import org.apache.lucene.search.DocIdSetIterator;
@ -79,12 +80,23 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
FieldDataType fieldDataType, IndexFieldDataCache cache, MapperService mapperService,
CircuitBreakerService breakerService) {
super(index, indexSettings, fieldNames, fieldDataType, cache);
parentTypes = new TreeSet<>();
this.breakerService = breakerService;
if (Version.indexCreated(indexSettings).before(Version.V_2_0_0_beta1)) {
parentTypes = new TreeSet<>();
for (DocumentMapper documentMapper : mapperService.docMappers(false)) {
beforeCreate(documentMapper);
}
mapperService.addTypeListener(this);
} else {
ImmutableSortedSet.Builder<String> builder = ImmutableSortedSet.naturalOrder();
for (DocumentMapper mapper : mapperService.docMappers(false)) {
ParentFieldMapper parentFieldMapper = mapper.parentFieldMapper();
if (parentFieldMapper.active()) {
builder.add(parentFieldMapper.type());
}
}
parentTypes = builder.build();
}
}
@Override
@ -96,10 +108,6 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
public AtomicParentChildFieldData load(LeafReaderContext context) {
if (Version.indexCreated(indexSettings).onOrAfter(Version.V_2_0_0_beta1)) {
final LeafReader reader = context.reader();
final NavigableSet<String> parentTypes;
synchronized (lock) {
parentTypes = ImmutableSortedSet.copyOf(this.parentTypes);
}
return new AbstractAtomicParentChildFieldData() {
public Set<String> types() {
@ -145,6 +153,8 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
@Override
public AbstractAtomicParentChildFieldData loadDirect(LeafReaderContext context) throws Exception {
// Make this method throw an UnsupportedOperationException in 3.0, only
// needed for indices created BEFORE 2.0
LeafReader reader = context.reader();
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat(
"acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO
@ -219,6 +229,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
@Override
public void beforeCreate(DocumentMapper mapper) {
// Remove in 3.0
synchronized (lock) {
ParentFieldMapper parentFieldMapper = mapper.parentFieldMapper();
if (parentFieldMapper.active()) {
@ -231,16 +242,6 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
}
}
@Override
public void afterRemove(DocumentMapper mapper) {
synchronized (lock) {
ParentFieldMapper parentFieldMapper = mapper.parentFieldMapper();
if (parentFieldMapper.active()) {
parentTypes.remove(new BytesRef(parentFieldMapper.type()));
}
}
}
@Override
protected AtomicParentChildFieldData empty(int maxDoc) {
return new ParentChildAtomicFieldData(ImmutableOpenMap.<String, AtomicOrdinalsFieldData>of());
@ -358,9 +359,13 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
final long startTime = System.nanoTime();
final Set<String> parentTypes;
if (Version.indexCreated(indexSettings).before(Version.V_2_0_0_beta1)) {
synchronized (lock) {
parentTypes = ImmutableSet.copyOf(this.parentTypes);
}
} else {
parentTypes = this.parentTypes;
}
long ramBytesUsed = 0;
final Map<String, OrdinalMapAndAtomicFieldData> perType = new HashMap<>();

View File

@ -30,11 +30,4 @@ public interface DocumentTypeListener {
*/
void beforeCreate(DocumentMapper mapper);
/**
* Invoked just after an existing document type has been removed.
*
* @param mapper The existing document mapper of the type being removed
*/
void afterRemove(DocumentMapper mapper);
}

View File

@ -226,14 +226,6 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
}
}
@Override
public void afterRemove(DocumentMapper mapper) {
if (PercolatorService.TYPE_NAME.equals(mapper.type())) {
disableRealTimePercolator();
clear();
}
}
}
private class ShardLifecycleListener extends IndicesLifecycle.Listener {

View File

@ -39,7 +39,7 @@ public class KeyedLockTests extends ElasticsearchTestCase {
public void checkIfMapEmptyAfterLotsOfAcquireAndReleases() throws InterruptedException {
ConcurrentHashMap<String, Integer> counter = new ConcurrentHashMap<>();
ConcurrentHashMap<String, AtomicInteger> safeCounter = new ConcurrentHashMap<>();
KeyedLock<String> connectionLock = randomBoolean() ? new KeyedLock.GlobalLockable<String>(randomBoolean()) : new KeyedLock<String>(randomBoolean());
KeyedLock<String> connectionLock = new KeyedLock<String>(randomBoolean());
String[] names = new String[randomIntBetween(1, 40)];
for (int i = 0; i < names.length; i++) {
names[i] = randomRealisticUnicodeOfLengthBetween(10, 20);
@ -54,11 +54,6 @@ public class KeyedLockTests extends ElasticsearchTestCase {
threads[i].start();
}
startLatch.countDown();
for (int i = 0; i < numThreads; i++) {
if (randomBoolean()) {
threads[i].incWithGlobal();
}
}
for (int i = 0; i < numThreads; i++) {
threads[i].join();
@ -74,23 +69,9 @@ public class KeyedLockTests extends ElasticsearchTestCase {
}
}
@Test(expected = IllegalStateException.class)
public void checkCannotAcquireTwoLocksGlobal() throws InterruptedException {
KeyedLock.GlobalLockable<String> connectionLock = new KeyedLock.GlobalLockable<>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.acquire(name);
try {
connectionLock.acquire(name);
} finally {
connectionLock.release(name);
connectionLock.globalLock().lock();
connectionLock.globalLock().unlock();
}
}
@Test(expected = IllegalStateException.class)
public void checkCannotAcquireTwoLocks() throws InterruptedException {
KeyedLock<String> connectionLock = randomBoolean() ? new KeyedLock.GlobalLockable<String>() : new KeyedLock<String>();
KeyedLock<String> connectionLock = new KeyedLock<String>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.acquire(name);
connectionLock.acquire(name);
@ -98,7 +79,7 @@ public class KeyedLockTests extends ElasticsearchTestCase {
@Test(expected = IllegalStateException.class)
public void checkCannotReleaseUnacquiredLock() throws InterruptedException {
KeyedLock<String> connectionLock = randomBoolean() ? new KeyedLock.GlobalLockable<String>() : new KeyedLock<String>();
KeyedLock<String> connectionLock = new KeyedLock<String>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.release(name);
}
@ -149,32 +130,5 @@ public class KeyedLockTests extends ElasticsearchTestCase {
}
}
}
public void incWithGlobal() {
if (connectionLock instanceof KeyedLock.GlobalLockable) {
final int iters = randomIntBetween(10, 200);
for (int i = 0; i < iters; i++) {
((KeyedLock.GlobalLockable) connectionLock).globalLock().lock();
try {
String curName = names[randomInt(names.length - 1)];
Integer integer = counter.get(curName);
if (integer == null) {
counter.put(curName, 1);
} else {
counter.put(curName, integer.intValue() + 1);
}
AtomicInteger atomicInteger = new AtomicInteger(0);
AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger);
if (value == null) {
atomicInteger.incrementAndGet();
} else {
value.incrementAndGet();
}
} finally {
((KeyedLock.GlobalLockable) connectionLock).globalLock().unlock();
}
}
}
}
}
}