Use a threadsafe map in SearchAsyncActionTests (#33700)
Today `SearchAsyncActionTests#testFanOutAndCollect` uses a simple `HashMap` for the `nodeToContextMap` variable, which is then accessed from multiple threads without, apparently, explicit synchronisation. This provides an explanation for the test failure identified in #29242 in which `.toString()` returns `"[]"` just before `.isEmpty` returns `false`, without any concurrent modifications. This change converts `nodeToContextMap` to a `newConcurrentMap()` so that this cannot occur. It also fixes a race condition in the detection of double-calling the subsequent search phase. Closes #29242.
This commit is contained in:
parent
5166dd0a4c
commit
7c63f5455b
|
@ -52,9 +52,13 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
|
||||
|
||||
public class SearchAsyncActionTests extends ESTestCase {
|
||||
|
||||
public void testSkipSearchShards() throws InterruptedException {
|
||||
|
@ -139,7 +143,7 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
|
||||
return new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
@ -260,7 +264,6 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
SearchRequest request = new SearchRequest();
|
||||
request.allowPartialSearchResults(true);
|
||||
request.setMaxConcurrentShardRequests(randomIntBetween(1, 100));
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<TestSearchResponse> response = new AtomicReference<>();
|
||||
ActionListener<SearchResponse> responseListener = new ActionListener<SearchResponse>() {
|
||||
@Override
|
||||
|
@ -277,7 +280,7 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
|
||||
Map<DiscoveryNode, Set<Long>> nodeToContextMap = new HashMap<>();
|
||||
Map<DiscoveryNode, Set<Long>> nodeToContextMap = newConcurrentMap();
|
||||
AtomicInteger contextIdGenerator = new AtomicInteger(0);
|
||||
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
|
||||
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
|
||||
|
@ -296,6 +299,8 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
|
||||
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
|
||||
final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, Runtime.getRuntime().availableProcessors()));
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicBoolean latchTriggered = new AtomicBoolean();
|
||||
AbstractSearchAsyncAction<TestSearchPhaseResult> asyncAction =
|
||||
new AbstractSearchAsyncAction<TestSearchPhaseResult>(
|
||||
"test",
|
||||
|
@ -326,7 +331,7 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
Transport.Connection connection = getConnection(null, shard.currentNodeId());
|
||||
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
|
||||
connection.getNode());
|
||||
Set<Long> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> new HashSet<>());
|
||||
Set<Long> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> newConcurrentSet());
|
||||
ids.add(testSearchPhaseResult.getRequestId());
|
||||
if (randomBoolean()) {
|
||||
listener.onResponse(testSearchPhaseResult);
|
||||
|
@ -339,15 +344,15 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
|
||||
return new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
public void run() {
|
||||
for (int i = 0; i < results.getNumShards(); i++) {
|
||||
TestSearchPhaseResult result = results.getAtomicArray().get(i);
|
||||
assertEquals(result.node.getId(), result.getSearchShardTarget().getNodeId());
|
||||
sendReleaseSearchContext(result.getRequestId(), new MockConnection(result.node), OriginalIndices.NONE);
|
||||
}
|
||||
responseListener.onResponse(response);
|
||||
if (latch.getCount() == 0) {
|
||||
throw new AssertionError("Running a search phase after the latch has reached 0 !!!!");
|
||||
if (latchTriggered.compareAndSet(false, true) == false) {
|
||||
throw new AssertionError("latch triggered twice");
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue