Fix filtering of node ids for TransportNodesAction (#18634)
* Fix filtering of node ids for TransportNodesAction Don't mix up member variables with local variables in constructor. closes #18618
This commit is contained in:
parent
148fd3f891
commit
fc1696822f
|
@ -176,12 +176,10 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
|
|||
this.request = request;
|
||||
this.listener = listener;
|
||||
ClusterState clusterState = clusterService.state();
|
||||
String[] nodesIds = resolveNodes(request, clusterState);
|
||||
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
|
||||
ImmutableOpenMap<String, DiscoveryNode> nodes = clusterState.nodes().getNodes();
|
||||
nodesIds = filterNodeIds(clusterState.nodes(), resolveNodes(request, clusterState));
|
||||
this.nodes = new DiscoveryNode[nodesIds.length];
|
||||
for (int i = 0; i < nodesIds.length; i++) {
|
||||
this.nodes[i] = nodes.get(nodesIds[i]);
|
||||
this.nodes[i] = clusterState.nodes().get(nodesIds[i]);
|
||||
}
|
||||
this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
|
||||
}
|
||||
|
|
|
@ -66,9 +66,10 @@ public class TransportNodesActionTests extends ESTestCase {
|
|||
|
||||
private ClusterService clusterService;
|
||||
private CapturingTransport transport;
|
||||
private TestTransportNodesAction action;
|
||||
private TransportService transportService;
|
||||
|
||||
public void testRequestIsSentToEachNode() throws Exception {
|
||||
TransportNodesAction action = getTestTransportNodesAction();
|
||||
TestNodesRequest request = new TestNodesRequest();
|
||||
PlainActionFuture<TestNodesResponse> listener = new PlainActionFuture<>();
|
||||
action.new AsyncAction(null, request, listener).start();
|
||||
|
@ -79,6 +80,7 @@ public class TransportNodesActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testNodesSelectors() {
|
||||
TransportNodesAction action = getTestTransportNodesAction();
|
||||
int numSelectors = randomIntBetween(1, 5);
|
||||
Set<String> nodeSelectors = new HashSet<>();
|
||||
for (int i = 0; i < numSelectors; i++) {
|
||||
|
@ -98,10 +100,12 @@ public class TransportNodesActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testNewResponseNullArray() {
|
||||
TransportNodesAction action = getTestTransportNodesAction();
|
||||
expectThrows(NullPointerException.class, () -> action.newResponse(new TestNodesRequest(), null));
|
||||
}
|
||||
|
||||
public void testNewResponse() {
|
||||
TestTransportNodesAction action = getTestTransportNodesAction();
|
||||
TestNodesRequest request = new TestNodesRequest();
|
||||
List<TestNodeResponse> expectedNodeResponses = mockList(TestNodeResponse.class, randomIntBetween(0, 2));
|
||||
expectedNodeResponses.add(new TestNodeResponse());
|
||||
|
@ -125,6 +129,19 @@ public class TransportNodesActionTests extends ESTestCase {
|
|||
assertTrue(failures.containsAll(response.failures()));
|
||||
}
|
||||
|
||||
public void testFiltering() throws Exception {
|
||||
TransportNodesAction action = getFilteringTestTransportNodesAction(transportService);
|
||||
TestNodesRequest request = new TestNodesRequest();
|
||||
PlainActionFuture<TestNodesResponse> listener = new PlainActionFuture<>();
|
||||
action.new AsyncAction(null, request, listener).start();
|
||||
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
|
||||
// check requests were only sent to data nodes
|
||||
for (String nodeTarget : capturedRequests.keySet()) {
|
||||
assertTrue(clusterService.state().nodes().get(nodeTarget).isDataNode());
|
||||
}
|
||||
assertEquals(clusterService.state().nodes().getDataNodes().size(), capturedRequests.size());
|
||||
}
|
||||
|
||||
private <T> List<T> mockList(Class<T> clazz, int size) {
|
||||
List<T> failures = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; ++i) {
|
||||
|
@ -160,7 +177,7 @@ public class TransportNodesActionTests extends ESTestCase {
|
|||
super.setUp();
|
||||
transport = new CapturingTransport();
|
||||
clusterService = createClusterService(THREAD_POOL);
|
||||
final TransportService transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName());
|
||||
transportService = new TransportService(transport, THREAD_POOL, clusterService.state().getClusterName());
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
int numNodes = randomIntBetween(3, 10);
|
||||
|
@ -182,7 +199,17 @@ public class TransportNodesActionTests extends ESTestCase {
|
|||
stateBuilder.nodes(discoBuilder);
|
||||
ClusterState clusterState = stateBuilder.build();
|
||||
setState(clusterService, clusterState);
|
||||
action = new TestTransportNodesAction(
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
clusterService.close();
|
||||
transport.close();
|
||||
}
|
||||
|
||||
public TestTransportNodesAction getTestTransportNodesAction() {
|
||||
return new TestTransportNodesAction(
|
||||
Settings.EMPTY,
|
||||
THREAD_POOL,
|
||||
clusterService,
|
||||
|
@ -194,11 +221,17 @@ public class TransportNodesActionTests extends ESTestCase {
|
|||
);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
clusterService.close();
|
||||
transport.close();
|
||||
public FilteringTestTransportNodesAction getFilteringTestTransportNodesAction(TransportService transportService) {
|
||||
return new FilteringTestTransportNodesAction(
|
||||
Settings.EMPTY,
|
||||
THREAD_POOL,
|
||||
clusterService,
|
||||
transportService,
|
||||
new ActionFilters(Collections.emptySet()),
|
||||
TestNodesRequest::new,
|
||||
TestNodeRequest::new,
|
||||
ThreadPool.Names.SAME
|
||||
);
|
||||
}
|
||||
|
||||
private static DiscoveryNode newNode(int nodeId, Map<String, String> attributes, Set<DiscoveryNode.Role> roles) {
|
||||
|
@ -243,6 +276,21 @@ public class TransportNodesActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private static class FilteringTestTransportNodesAction
|
||||
extends TestTransportNodesAction {
|
||||
|
||||
FilteringTestTransportNodesAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService
|
||||
transportService, ActionFilters actionFilters, Supplier<TestNodesRequest> request,
|
||||
Supplier<TestNodeRequest> nodeRequest, String nodeExecutor) {
|
||||
super(settings, threadPool, clusterService, transportService, actionFilters, request, nodeRequest, nodeExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
|
||||
return nodes.getDataNodes().keys().toArray(String.class);
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestNodesRequest extends BaseNodesRequest<TestNodesRequest> {
|
||||
TestNodesRequest(String... nodesIds) {
|
||||
super(nodesIds);
|
||||
|
|
Loading…
Reference in New Issue