diff --git a/src/main/java/org/elasticsearch/index/cache/id/simple/SimpleIdCache.java b/src/main/java/org/elasticsearch/index/cache/id/simple/SimpleIdCache.java index 41ec6e1cc29..7b6f867b904 100644 --- a/src/main/java/org/elasticsearch/index/cache/id/simple/SimpleIdCache.java +++ b/src/main/java/org/elasticsearch/index/cache/id/simple/SimpleIdCache.java @@ -321,6 +321,7 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se */ public HashedBytesArray canReuse(HashedBytesArray id) { if (idToDoc.containsKey(id)) { + // we can use #lkey() since this is called from a synchronized block return idToDoc.lkey(); } else { return id; diff --git a/src/main/java/org/elasticsearch/index/cache/id/simple/SimpleIdReaderTypeCache.java b/src/main/java/org/elasticsearch/index/cache/id/simple/SimpleIdReaderTypeCache.java index e27f9d9290c..8736a3bdacd 100644 --- a/src/main/java/org/elasticsearch/index/cache/id/simple/SimpleIdReaderTypeCache.java +++ b/src/main/java/org/elasticsearch/index/cache/id/simple/SimpleIdReaderTypeCache.java @@ -60,7 +60,9 @@ public class SimpleIdReaderTypeCache implements IdReaderTypeCache { public int docById(HashedBytesArray uid) { if (idToDoc.containsKey(uid)) { - return idToDoc.lget(); + // We can't use #lget() here since the idToDoc map shared across threads, so we really need a second lookup... + // BTW: This method is only used via TopChildrenQuery + return idToDoc.get(uid); } else { return -1; } @@ -82,6 +84,7 @@ public class SimpleIdReaderTypeCache implements IdReaderTypeCache { */ public HashedBytesArray canReuse(HashedBytesArray id) { if (idToDoc.containsKey(id)) { + // we can use #lkey() since this is called from a synchronized block return idToDoc.lkey(); } else { return id; diff --git a/src/test/java/org/elasticsearch/search/child/SimpleChildQuerySearchTests.java b/src/test/java/org/elasticsearch/search/child/SimpleChildQuerySearchTests.java index 681b9e4e559..a4403e49472 100644 --- a/src/test/java/org/elasticsearch/search/child/SimpleChildQuerySearchTests.java +++ b/src/test/java/org/elasticsearch/search/child/SimpleChildQuerySearchTests.java @@ -45,6 +45,8 @@ import org.junit.Test; import java.io.IOException; import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -1894,7 +1896,65 @@ public class SimpleChildQuerySearchTests extends AbstractIntegrationTest { } catch (MergeMappingException e) { assertThat(e.getMessage(), equalTo("Merge failed with failures {[The _parent field can't be added or updated]}")); } + } + @Test + // The SimpleIdReaderTypeCache#docById method used lget, which can't be used if a map is shared. + public void testTopChildrenBug_concurrencyIssue() throws Exception { + client().admin().indices().prepareCreate("test") + .setSettings( + ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + ).execute().actionGet(); + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); + client().admin().indices().preparePutMapping("test").setType("child").setSource(jsonBuilder().startObject().startObject("type") + .startObject("_parent").field("type", "parent").endObject() + .endObject().endObject()).execute().actionGet(); + + // index simple data + client().prepareIndex("test", "parent", "p1").setSource("p_field", "p_value1").execute().actionGet(); + client().prepareIndex("test", "parent", "p2").setSource("p_field", "p_value2").execute().actionGet(); + client().prepareIndex("test", "child", "c1").setParent("p1").setSource("c_field", "blue").execute().actionGet(); + client().prepareIndex("test", "child", "c2").setParent("p1").setSource("c_field", "red").execute().actionGet(); + client().prepareIndex("test", "child", "c3").setParent("p2").setSource("c_field", "red").execute().actionGet(); + client().admin().indices().prepareRefresh("test").execute().actionGet(); + + int numThreads = 10; + final CountDownLatch latch = new CountDownLatch(numThreads); + final AtomicReference holder = new AtomicReference(); + Runnable r = new Runnable() { + @Override + public void run() { + try { + for (int i = 0; i < 100; i++) { + SearchResponse searchResponse = client().prepareSearch("test") + .setQuery(topChildrenQuery("child", termQuery("c_field", "blue"))) + .execute().actionGet(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().totalHits(), equalTo(1l)); + + searchResponse = client().prepareSearch("test") + .setQuery(topChildrenQuery("child", termQuery("c_field", "red"))) + .execute().actionGet(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().totalHits(), equalTo(2l)); + } + } catch (AssertionError error) { + holder.set(error); + } finally { + latch.countDown(); + } + } + }; + + for (int i = 0; i < 10; i++) { + new Thread(r).start(); + } + latch.await(); + if (holder.get() != null) { + throw holder.get(); + } } }