Parent/child: Fix concurrency issues of the _parent field data.

`_parent` field data mistakenly shared some stateful data-structures across
threads.

Close #8396
This commit is contained in:
Adrien Grand 2014-12-19 17:34:08 +01:00
parent 67eba23b2d
commit 7678ab5264
2 changed files with 199 additions and 99 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.fielddata.plain;
import com.carrotsearch.hppc.ObjectObjectOpenHashMap; import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.ImmutableSortedSet;
import org.apache.lucene.index.*; import org.apache.lucene.index.*;
import org.apache.lucene.index.MultiDocValues.OrdinalMap; import org.apache.lucene.index.MultiDocValues.OrdinalMap;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
@ -35,6 +36,8 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
@ -271,71 +274,49 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
} }
} }
private static OrdinalMap buildOrdinalMap(AtomicParentChildFieldData[] atomicFD, String parentType) throws IOException {
final SortedDocValues[] ordinals = new SortedDocValues[atomicFD.length];
for (int i = 0; i < ordinals.length; ++i) {
ordinals[i] = atomicFD[i].getOrdinalsValues(parentType);
}
return OrdinalMap.build(null, ordinals, PackedInts.DEFAULT);
}
private static class OrdinalMapAndAtomicFieldData {
final OrdinalMap ordMap;
final AtomicParentChildFieldData[] fieldData;
public OrdinalMapAndAtomicFieldData(OrdinalMap ordMap, AtomicParentChildFieldData[] fieldData) {
this.ordMap = ordMap;
this.fieldData = fieldData;
}
}
@Override @Override
public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception { public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
final long startTime = System.nanoTime(); final long startTime = System.nanoTime();
final Map<String, SortedDocValues[]> types = new HashMap<>(); final Set<String> parentTypes = new HashSet<>();
synchronized (lock) { synchronized (lock) {
for (BytesRef type : parentTypes) { for (BytesRef type : this.parentTypes) {
final SortedDocValues[] values = new SortedDocValues[indexReader.leaves().size()]; parentTypes.add(type.utf8ToString());
Arrays.fill(values, DocValues.emptySorted());
types.put(type.utf8ToString(), values);
}
}
for (Map.Entry<String, SortedDocValues[]> entry : types.entrySet()) {
final String parentType = entry.getKey();
final SortedDocValues[] values = entry.getValue();
for (LeafReaderContext context : indexReader.leaves()) {
SortedDocValues vals = load(context).getOrdinalsValues(parentType);
if (vals != null) {
values[context.ord] = vals;
}
} }
} }
long ramBytesUsed = 0; long ramBytesUsed = 0;
@SuppressWarnings("unchecked") final Map<String, OrdinalMapAndAtomicFieldData> perType = new HashMap<>();
final Map<String, SortedDocValues>[] global = new Map[indexReader.leaves().size()]; for (String type : parentTypes) {
for (Map.Entry<String, SortedDocValues[]> entry : types.entrySet()) { final AtomicParentChildFieldData[] fieldData = new AtomicParentChildFieldData[indexReader.leaves().size()];
final String parentType = entry.getKey(); for (LeafReaderContext context : indexReader.leaves()) {
final SortedDocValues[] values = entry.getValue(); fieldData[context.ord] = load(context);
final OrdinalMap ordinalMap = OrdinalMap.build(null, entry.getValue(), PackedInts.DEFAULT);
ramBytesUsed += ordinalMap.ramBytesUsed();
for (int i = 0; i < values.length; ++i) {
final SortedDocValues segmentValues = values[i];
final LongValues globalOrds = ordinalMap.getGlobalOrds(i);
final SortedDocValues globalSortedValues = new SortedDocValues() {
@Override
public BytesRef lookupOrd(int ord) {
final int segmentNum = ordinalMap.getFirstSegmentNumber(ord);
final int segmentOrd = (int) ordinalMap.getFirstSegmentOrd(ord);
return values[segmentNum].lookupOrd(segmentOrd);
}
@Override
public int getValueCount() {
return (int) ordinalMap.getValueCount();
}
@Override
public int getOrd(int docID) {
final int segmentOrd = segmentValues.getOrd(docID);
// TODO: is there a way we can get rid of this branch?
if (segmentOrd >= 0) {
return (int) globalOrds.get(segmentOrd);
} else {
return segmentOrd;
}
}
};
Map<String, SortedDocValues> perSegmentGlobal = global[i];
if (perSegmentGlobal == null) {
perSegmentGlobal = new HashMap<>(1);
global[i] = perSegmentGlobal;
}
perSegmentGlobal.put(parentType, globalSortedValues);
} }
final OrdinalMap ordMap = buildOrdinalMap(fieldData, type);
ramBytesUsed += ordMap.ramBytesUsed();
perType.put(type, new OrdinalMapAndAtomicFieldData(ordMap, fieldData));
}
final AtomicParentChildFieldData[] fielddata = new AtomicParentChildFieldData[indexReader.leaves().size()];
for (int i = 0; i < fielddata.length; ++i) {
fielddata[i] = new GlobalAtomicFieldData(parentTypes, perType, i);
} }
breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(ramBytesUsed); breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(ramBytesUsed);
@ -346,52 +327,72 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
); );
} }
return new GlobalFieldData(indexReader, global, ramBytesUsed); return new GlobalFieldData(indexReader, fielddata, ramBytesUsed);
} }
private class GlobalFieldData implements IndexParentChildFieldData, Accountable { private static class GlobalAtomicFieldData extends AbstractAtomicParentChildFieldData {
private final AtomicParentChildFieldData[] atomicFDs; private final Set<String> types;
private final IndexReader reader; private final Map<String, OrdinalMapAndAtomicFieldData> atomicFD;
private final long ramBytesUsed; private final int segmentIndex;
GlobalFieldData(IndexReader reader, final Map<String, SortedDocValues>[] globalValues, long ramBytesUsed) { public GlobalAtomicFieldData(Set<String> types, Map<String, OrdinalMapAndAtomicFieldData> atomicFD, int segmentIndex) {
this.reader = reader; this.types = types;
this.ramBytesUsed = ramBytesUsed; this.atomicFD = atomicFD;
this.atomicFDs = new AtomicParentChildFieldData[globalValues.length]; this.segmentIndex = segmentIndex;
for (int i = 0; i < globalValues.length; ++i) { }
final int ord = i;
atomicFDs[i] = new AbstractAtomicParentChildFieldData() { @Override
@Override public Set<String> types() {
public long ramBytesUsed() { return types;
return 0; }
@Override
public SortedDocValues getOrdinalsValues(String type) {
final OrdinalMapAndAtomicFieldData atomicFD = this.atomicFD.get(type);
final OrdinalMap ordMap = atomicFD.ordMap;
final SortedDocValues[] allSegmentValues = new SortedDocValues[atomicFD.fieldData.length];
for (int i = 0; i < allSegmentValues.length; ++i) {
allSegmentValues[i] = atomicFD.fieldData[i].getOrdinalsValues(type);
}
final SortedDocValues segmentValues = allSegmentValues[segmentIndex];
if (segmentValues.getValueCount() == ordMap.getValueCount()) {
// ords are already global
return segmentValues;
}
final LongValues globalOrds = ordMap.getGlobalOrds(segmentIndex);
return new SortedDocValues() {
@Override
public BytesRef lookupOrd(int ord) {
final int segmentIndex = ordMap.getFirstSegmentNumber(ord);
final int segmentOrd = (int) ordMap.getFirstSegmentOrd(ord);
return allSegmentValues[segmentIndex].lookupOrd(segmentOrd);
}
@Override
public int getValueCount() {
return (int) ordMap.getValueCount();
}
@Override
public int getOrd(int docID) {
final int segmentOrd = segmentValues.getOrd(docID);
// TODO: is there a way we can get rid of this branch?
if (segmentOrd >= 0) {
return (int) globalOrds.get(segmentOrd);
} else {
return segmentOrd;
} }
}
};
}
@Override @Override
public Iterable<Accountable> getChildResources() { public long ramBytesUsed() {
// TODO: is this really the best? // this class does not take memory on its own, the index-level field data does
return Collections.emptyList(); // it through the use of ordinal maps
} return 0;
@Override
public void close() {
}
@Override
public Set<String> types() {
return Collections.unmodifiableSet(globalValues[ord].keySet());
}
@Override
public SortedDocValues getOrdinalsValues(String type) {
SortedDocValues dv = globalValues[ord].get(type);
if (dv == null) {
dv = DocValues.emptySorted();
}
return dv;
}
};
}
} }
@Override @Override
@ -399,6 +400,29 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
return Collections.emptyList(); return Collections.emptyList();
} }
@Override
public void close() throws ElasticsearchException {
List<Releasable> closeables = new ArrayList<>();
for (OrdinalMapAndAtomicFieldData fds : atomicFD.values()) {
closeables.addAll(Arrays.asList(fds.fieldData));
}
Releasables.close(closeables);
}
}
private class GlobalFieldData implements IndexParentChildFieldData, Accountable {
private final AtomicParentChildFieldData[] fielddata;
private final IndexReader reader;
private final long ramBytesUsed;
GlobalFieldData(IndexReader reader, AtomicParentChildFieldData[] fielddata, long ramBytesUsed) {
this.reader = reader;
this.ramBytesUsed = ramBytesUsed;
this.fielddata = fielddata;
}
@Override @Override
public Names getFieldNames() { public Names getFieldNames() {
return ParentChildIndexFieldData.this.getFieldNames(); return ParentChildIndexFieldData.this.getFieldNames();
@ -412,7 +436,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
@Override @Override
public AtomicParentChildFieldData load(LeafReaderContext context) { public AtomicParentChildFieldData load(LeafReaderContext context) {
assert context.reader().getCoreCacheKey() == reader.leaves().get(context.ord).reader().getCoreCacheKey(); assert context.reader().getCoreCacheKey() == reader.leaves().get(context.ord).reader().getCoreCacheKey();
return atomicFDs[context.ord]; return fielddata[context.ord];
} }
@Override @Override
@ -445,6 +469,11 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<AtomicPare
return ramBytesUsed; return ramBytesUsed;
} }
@Override
public Iterable<Accountable> getChildResources() {
return Collections.emptyList();
}
@Override @Override
public IndexParentChildFieldData loadGlobal(IndexReader indexReader) { public IndexParentChildFieldData loadGlobal(IndexReader indexReader) {
if (indexReader.getCoreCacheKey() == reader.getCoreCacheKey()) { if (indexReader.getCoreCacheKey() == reader.getCoreCacheKey()) {

View File

@ -23,11 +23,19 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField; import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.*; import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource; import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource;
import org.elasticsearch.index.fielddata.plain.ParentChildIndexFieldData;
import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper;
@ -35,7 +43,14 @@ import org.elasticsearch.search.MultiValueMode;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.Matchers.*; import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.nullValue;
/** /**
*/ */
@ -184,6 +199,62 @@ public class ParentChildFieldDataTests extends AbstractFieldDataTests {
assertThat(((FieldDoc) topDocs.scoreDocs[7]).fields[0], nullValue()); assertThat(((FieldDoc) topDocs.scoreDocs[7]).fields[0], nullValue());
} }
public void testThreads() throws Exception {
final ParentChildIndexFieldData indexFieldData = getForField(childType);
final DirectoryReader reader = DirectoryReader.open(writer, true);
final IndexParentChildFieldData global = indexFieldData.loadGlobal(reader);
final AtomicReference<Exception> error = new AtomicReference<>();
final int numThreads = scaledRandomIntBetween(3, 8);
final Thread[] threads = new Thread[numThreads];
final CountDownLatch latch = new CountDownLatch(1);
final Map<Object, BytesRef[]> expected = new HashMap<>();
for (LeafReaderContext context : reader.leaves()) {
AtomicParentChildFieldData leafData = global.load(context);
SortedDocValues parentIds = leafData.getOrdinalsValues(parentType);
final BytesRef[] ids = new BytesRef[parentIds.getValueCount()];
for (int j = 0; j < parentIds.getValueCount(); ++j) {
final BytesRef id = parentIds.lookupOrd(j);
if (id != null) {
ids[j] = BytesRef.deepCopyOf(id);
}
}
expected.put(context.reader().getCoreCacheKey(), ids);
}
for (int i = 0; i < numThreads; ++i) {
threads[i] = new Thread() {
@Override
public void run() {
try {
latch.await();
for (int i = 0; i < 100000; ++i) {
for (LeafReaderContext context : reader.leaves()) {
AtomicParentChildFieldData leafData = global.load(context);
SortedDocValues parentIds = leafData.getOrdinalsValues(parentType);
final BytesRef[] expectedIds = expected.get(context.reader().getCoreCacheKey());
for (int j = 0; j < parentIds.getValueCount(); ++j) {
final BytesRef id = parentIds.lookupOrd(j);
assertEquals(expectedIds[j], id);
}
}
}
} catch (Exception e) {
error.compareAndSet(null, e);
}
}
};
threads[i].start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
if (error.get() != null) {
throw error.get();
}
}
@Override @Override
protected FieldDataType getFieldDataType() { protected FieldDataType getFieldDataType() {
return new FieldDataType("_parent"); return new FieldDataType("_parent");