From 7678ab52644ebeedf8a3428382cadb065fd24f51 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 19 Dec 2014 17:34:08 +0100 Subject: [PATCH] Parent/child: Fix concurrency issues of the _parent field data. `_parent` field data mistakenly shared some stateful data-structures across threads. Close #8396 --- .../plain/ParentChildIndexFieldData.java | 223 ++++++++++-------- .../fielddata/ParentChildFieldDataTests.java | 75 +++++- 2 files changed, 199 insertions(+), 99 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java index 87da44b8de6..f26dc9f5c11 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/plain/ParentChildIndexFieldData.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.fielddata.plain; import com.carrotsearch.hppc.ObjectObjectOpenHashMap; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.collect.ImmutableSortedSet; + import org.apache.lucene.index.*; import org.apache.lucene.index.MultiDocValues.OrdinalMap; import org.apache.lucene.util.Accountable; @@ -35,6 +36,8 @@ import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; @@ -271,71 +274,49 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData types = new HashMap<>(); + final Set parentTypes = new HashSet<>(); synchronized (lock) { - for (BytesRef type : parentTypes) { - final SortedDocValues[] values = new SortedDocValues[indexReader.leaves().size()]; - Arrays.fill(values, DocValues.emptySorted()); - types.put(type.utf8ToString(), values); - } - } - - for (Map.Entry entry : types.entrySet()) { - final String parentType = entry.getKey(); - final SortedDocValues[] values = entry.getValue(); - for (LeafReaderContext context : indexReader.leaves()) { - SortedDocValues vals = load(context).getOrdinalsValues(parentType); - if (vals != null) { - values[context.ord] = vals; - } + for (BytesRef type : this.parentTypes) { + parentTypes.add(type.utf8ToString()); } } long ramBytesUsed = 0; - @SuppressWarnings("unchecked") - final Map[] global = new Map[indexReader.leaves().size()]; - for (Map.Entry entry : types.entrySet()) { - final String parentType = entry.getKey(); - final SortedDocValues[] values = entry.getValue(); - final OrdinalMap ordinalMap = OrdinalMap.build(null, entry.getValue(), PackedInts.DEFAULT); - ramBytesUsed += ordinalMap.ramBytesUsed(); - for (int i = 0; i < values.length; ++i) { - final SortedDocValues segmentValues = values[i]; - final LongValues globalOrds = ordinalMap.getGlobalOrds(i); - final SortedDocValues globalSortedValues = new SortedDocValues() { - @Override - public BytesRef lookupOrd(int ord) { - final int segmentNum = ordinalMap.getFirstSegmentNumber(ord); - final int segmentOrd = (int) ordinalMap.getFirstSegmentOrd(ord); - return values[segmentNum].lookupOrd(segmentOrd); - } - - @Override - public int getValueCount() { - return (int) ordinalMap.getValueCount(); - } - - @Override - public int getOrd(int docID) { - final int segmentOrd = segmentValues.getOrd(docID); - // TODO: is there a way we can get rid of this branch? - if (segmentOrd >= 0) { - return (int) globalOrds.get(segmentOrd); - } else { - return segmentOrd; - } - } - }; - Map perSegmentGlobal = global[i]; - if (perSegmentGlobal == null) { - perSegmentGlobal = new HashMap<>(1); - global[i] = perSegmentGlobal; - } - perSegmentGlobal.put(parentType, globalSortedValues); + final Map perType = new HashMap<>(); + for (String type : parentTypes) { + final AtomicParentChildFieldData[] fieldData = new AtomicParentChildFieldData[indexReader.leaves().size()]; + for (LeafReaderContext context : indexReader.leaves()) { + fieldData[context.ord] = load(context); } + final OrdinalMap ordMap = buildOrdinalMap(fieldData, type); + ramBytesUsed += ordMap.ramBytesUsed(); + perType.put(type, new OrdinalMapAndAtomicFieldData(ordMap, fieldData)); + } + + final AtomicParentChildFieldData[] fielddata = new AtomicParentChildFieldData[indexReader.leaves().size()]; + for (int i = 0; i < fielddata.length; ++i) { + fielddata[i] = new GlobalAtomicFieldData(parentTypes, perType, i); } breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(ramBytesUsed); @@ -346,52 +327,72 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData types; + private final Map atomicFD; + private final int segmentIndex; - GlobalFieldData(IndexReader reader, final Map[] globalValues, long ramBytesUsed) { - this.reader = reader; - this.ramBytesUsed = ramBytesUsed; - this.atomicFDs = new AtomicParentChildFieldData[globalValues.length]; - for (int i = 0; i < globalValues.length; ++i) { - final int ord = i; - atomicFDs[i] = new AbstractAtomicParentChildFieldData() { - @Override - public long ramBytesUsed() { - return 0; + public GlobalAtomicFieldData(Set types, Map atomicFD, int segmentIndex) { + this.types = types; + this.atomicFD = atomicFD; + this.segmentIndex = segmentIndex; + } + + @Override + public Set types() { + return types; + } + + @Override + public SortedDocValues getOrdinalsValues(String type) { + final OrdinalMapAndAtomicFieldData atomicFD = this.atomicFD.get(type); + final OrdinalMap ordMap = atomicFD.ordMap; + final SortedDocValues[] allSegmentValues = new SortedDocValues[atomicFD.fieldData.length]; + for (int i = 0; i < allSegmentValues.length; ++i) { + allSegmentValues[i] = atomicFD.fieldData[i].getOrdinalsValues(type); + } + final SortedDocValues segmentValues = allSegmentValues[segmentIndex]; + if (segmentValues.getValueCount() == ordMap.getValueCount()) { + // ords are already global + return segmentValues; + } + final LongValues globalOrds = ordMap.getGlobalOrds(segmentIndex); + return new SortedDocValues() { + + @Override + public BytesRef lookupOrd(int ord) { + final int segmentIndex = ordMap.getFirstSegmentNumber(ord); + final int segmentOrd = (int) ordMap.getFirstSegmentOrd(ord); + return allSegmentValues[segmentIndex].lookupOrd(segmentOrd); + } + + @Override + public int getValueCount() { + return (int) ordMap.getValueCount(); + } + + @Override + public int getOrd(int docID) { + final int segmentOrd = segmentValues.getOrd(docID); + // TODO: is there a way we can get rid of this branch? + if (segmentOrd >= 0) { + return (int) globalOrds.get(segmentOrd); + } else { + return segmentOrd; } + } + }; + } - @Override - public Iterable getChildResources() { - // TODO: is this really the best? - return Collections.emptyList(); - } - - @Override - public void close() { - } - - @Override - public Set types() { - return Collections.unmodifiableSet(globalValues[ord].keySet()); - } - - @Override - public SortedDocValues getOrdinalsValues(String type) { - SortedDocValues dv = globalValues[ord].get(type); - if (dv == null) { - dv = DocValues.emptySorted(); - } - return dv; - } - }; - } + @Override + public long ramBytesUsed() { + // this class does not take memory on its own, the index-level field data does + // it through the use of ordinal maps + return 0; } @Override @@ -399,6 +400,29 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData closeables = new ArrayList<>(); + for (OrdinalMapAndAtomicFieldData fds : atomicFD.values()) { + closeables.addAll(Arrays.asList(fds.fieldData)); + } + Releasables.close(closeables); + } + + } + + private class GlobalFieldData implements IndexParentChildFieldData, Accountable { + + private final AtomicParentChildFieldData[] fielddata; + private final IndexReader reader; + private final long ramBytesUsed; + + GlobalFieldData(IndexReader reader, AtomicParentChildFieldData[] fielddata, long ramBytesUsed) { + this.reader = reader; + this.ramBytesUsed = ramBytesUsed; + this.fielddata = fielddata; + } + @Override public Names getFieldNames() { return ParentChildIndexFieldData.this.getFieldNames(); @@ -412,7 +436,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData getChildResources() { + return Collections.emptyList(); + } + @Override public IndexParentChildFieldData loadGlobal(IndexReader indexReader) { if (indexReader.getCoreCacheKey() == reader.getCoreCacheKey()) { diff --git a/src/test/java/org/elasticsearch/index/fielddata/ParentChildFieldDataTests.java b/src/test/java/org/elasticsearch/index/fielddata/ParentChildFieldDataTests.java index 852ae0eef7a..144d137f46d 100644 --- a/src/test/java/org/elasticsearch/index/fielddata/ParentChildFieldDataTests.java +++ b/src/test/java/org/elasticsearch/index/fielddata/ParentChildFieldDataTests.java @@ -23,11 +23,19 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.search.*; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource; +import org.elasticsearch.index.fielddata.plain.ParentChildIndexFieldData; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper; @@ -35,7 +43,14 @@ import org.elasticsearch.search.MultiValueMode; import org.junit.Before; import org.junit.Test; -import static org.hamcrest.Matchers.*; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.nullValue; /** */ @@ -184,6 +199,62 @@ public class ParentChildFieldDataTests extends AbstractFieldDataTests { assertThat(((FieldDoc) topDocs.scoreDocs[7]).fields[0], nullValue()); } + public void testThreads() throws Exception { + final ParentChildIndexFieldData indexFieldData = getForField(childType); + final DirectoryReader reader = DirectoryReader.open(writer, true); + final IndexParentChildFieldData global = indexFieldData.loadGlobal(reader); + final AtomicReference error = new AtomicReference<>(); + final int numThreads = scaledRandomIntBetween(3, 8); + final Thread[] threads = new Thread[numThreads]; + final CountDownLatch latch = new CountDownLatch(1); + + final Map expected = new HashMap<>(); + for (LeafReaderContext context : reader.leaves()) { + AtomicParentChildFieldData leafData = global.load(context); + SortedDocValues parentIds = leafData.getOrdinalsValues(parentType); + final BytesRef[] ids = new BytesRef[parentIds.getValueCount()]; + for (int j = 0; j < parentIds.getValueCount(); ++j) { + final BytesRef id = parentIds.lookupOrd(j); + if (id != null) { + ids[j] = BytesRef.deepCopyOf(id); + } + } + expected.put(context.reader().getCoreCacheKey(), ids); + } + + for (int i = 0; i < numThreads; ++i) { + threads[i] = new Thread() { + @Override + public void run() { + try { + latch.await(); + for (int i = 0; i < 100000; ++i) { + for (LeafReaderContext context : reader.leaves()) { + AtomicParentChildFieldData leafData = global.load(context); + SortedDocValues parentIds = leafData.getOrdinalsValues(parentType); + final BytesRef[] expectedIds = expected.get(context.reader().getCoreCacheKey()); + for (int j = 0; j < parentIds.getValueCount(); ++j) { + final BytesRef id = parentIds.lookupOrd(j); + assertEquals(expectedIds[j], id); + } + } + } + } catch (Exception e) { + error.compareAndSet(null, e); + } + } + }; + threads[i].start(); + } + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + if (error.get() != null) { + throw error.get(); + } + } + @Override protected FieldDataType getFieldDataType() { return new FieldDataType("_parent");