Fixed concurrency issue in simple id cache. The lget() of a map can only be used if the map isn't shared.

This commit is contained in:
Martijn van Groningen 2013-10-28 15:59:22 +01:00
parent c68016bb83
commit 81089fb228
3 changed files with 65 additions and 1 deletions

View File

@ -321,6 +321,7 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
*/ */
public HashedBytesArray canReuse(HashedBytesArray id) { public HashedBytesArray canReuse(HashedBytesArray id) {
if (idToDoc.containsKey(id)) { if (idToDoc.containsKey(id)) {
// we can use #lkey() since this is called from a synchronized block
return idToDoc.lkey(); return idToDoc.lkey();
} else { } else {
return id; return id;

View File

@ -60,7 +60,9 @@ public class SimpleIdReaderTypeCache implements IdReaderTypeCache {
public int docById(HashedBytesArray uid) { public int docById(HashedBytesArray uid) {
if (idToDoc.containsKey(uid)) { if (idToDoc.containsKey(uid)) {
return idToDoc.lget(); // We can't use #lget() here since the idToDoc map shared across threads, so we really need a second lookup...
// BTW: This method is only used via TopChildrenQuery
return idToDoc.get(uid);
} else { } else {
return -1; return -1;
} }
@ -82,6 +84,7 @@ public class SimpleIdReaderTypeCache implements IdReaderTypeCache {
*/ */
public HashedBytesArray canReuse(HashedBytesArray id) { public HashedBytesArray canReuse(HashedBytesArray id) {
if (idToDoc.containsKey(id)) { if (idToDoc.containsKey(id)) {
// we can use #lkey() since this is called from a synchronized block
return idToDoc.lkey(); return idToDoc.lkey();
} else { } else {
return id; return id;

View File

@ -45,6 +45,8 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -1894,7 +1896,65 @@ public class SimpleChildQuerySearchTests extends AbstractIntegrationTest {
} catch (MergeMappingException e) { } catch (MergeMappingException e) {
assertThat(e.getMessage(), equalTo("Merge failed with failures {[The _parent field can't be added or updated]}")); assertThat(e.getMessage(), equalTo("Merge failed with failures {[The _parent field can't be added or updated]}"));
} }
}
@Test
// The SimpleIdReaderTypeCache#docById method used lget, which can't be used if a map is shared.
public void testTopChildrenBug_concurrencyIssue() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
client().admin().indices().preparePutMapping("test").setType("child").setSource(jsonBuilder().startObject().startObject("type")
.startObject("_parent").field("type", "parent").endObject()
.endObject().endObject()).execute().actionGet();
// index simple data
client().prepareIndex("test", "parent", "p1").setSource("p_field", "p_value1").execute().actionGet();
client().prepareIndex("test", "parent", "p2").setSource("p_field", "p_value2").execute().actionGet();
client().prepareIndex("test", "child", "c1").setParent("p1").setSource("c_field", "blue").execute().actionGet();
client().prepareIndex("test", "child", "c2").setParent("p1").setSource("c_field", "red").execute().actionGet();
client().prepareIndex("test", "child", "c3").setParent("p2").setSource("c_field", "red").execute().actionGet();
client().admin().indices().prepareRefresh("test").execute().actionGet();
int numThreads = 10;
final CountDownLatch latch = new CountDownLatch(numThreads);
final AtomicReference<AssertionError> holder = new AtomicReference<AssertionError>();
Runnable r = new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(topChildrenQuery("child", termQuery("c_field", "blue")))
.execute().actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
searchResponse = client().prepareSearch("test")
.setQuery(topChildrenQuery("child", termQuery("c_field", "red")))
.execute().actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(2l));
}
} catch (AssertionError error) {
holder.set(error);
} finally {
latch.countDown();
}
}
};
for (int i = 0; i < 10; i++) {
new Thread(r).start();
}
latch.await();
if (holder.get() != null) {
throw holder.get();
}
} }
} }