Test: fix racing condition in IndicesRequestTests
a request could be captured after action array was cleared.
This commit is contained in:
parent
635ae29bf1
commit
cb0d462aa0
|
@ -106,10 +106,6 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
|
||||
|
@ -903,24 +899,24 @@ public class IndicesRequestTests extends ElasticsearchIntegrationTest {
|
|||
|
||||
public static class InterceptingTransportService extends TransportService {
|
||||
|
||||
private final Set<String> actions = new CopyOnWriteArraySet<>();
|
||||
private final Set<String> actions = new HashSet<>();
|
||||
|
||||
private final ConcurrentMap<String, List<TransportRequest>> requests = new ConcurrentHashMap<>();
|
||||
private final Map<String, List<TransportRequest>> requests = new HashMap<>();
|
||||
|
||||
@Inject
|
||||
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
|
||||
super(settings, transport, threadPool);
|
||||
}
|
||||
|
||||
List<TransportRequest> consumeRequests(String action) {
|
||||
synchronized List<TransportRequest> consumeRequests(String action) {
|
||||
return requests.remove(action);
|
||||
}
|
||||
|
||||
void interceptTransportActions(String... actions) {
|
||||
synchronized void interceptTransportActions(String... actions) {
|
||||
Collections.addAll(this.actions, actions);
|
||||
}
|
||||
|
||||
void clearInterceptedActions() {
|
||||
synchronized void clearInterceptedActions() {
|
||||
actions.clear();
|
||||
}
|
||||
|
||||
|
@ -945,14 +941,17 @@ public class IndicesRequestTests extends ElasticsearchIntegrationTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void messageReceived(TransportRequest request, TransportChannel channel) throws Exception {
|
||||
if (actions.contains(action)) {
|
||||
List<TransportRequest> requestList = new CopyOnWriteArrayList<>();
|
||||
requestList.add(request);
|
||||
List<TransportRequest> transportRequests = requests.putIfAbsent(action, requestList);
|
||||
if (transportRequests != null) {
|
||||
transportRequests.add(request);
|
||||
synchronized (InterceptingTransportService.this) {
|
||||
if (actions.contains(action)) {
|
||||
List<TransportRequest> requestList = requests.get(action);
|
||||
if (requestList == null) {
|
||||
requestList = new ArrayList<>();
|
||||
requestList.add(request);
|
||||
requests.put(action, requestList);
|
||||
} else {
|
||||
requestList.add(request);
|
||||
}
|
||||
}
|
||||
}
|
||||
requestHandler.messageReceived(request, channel);
|
||||
|
|
Loading…
Reference in New Issue