Break out clear scroll logic from TransportClearScrollAction (#25125)

This change extracts the main logic from `TransportClearScrollAction`
into a new class `ClearScrollController` and adds a corresponding unit test.

Relates to #25094
This commit is contained in:
Simon Willnauer 2017-06-08 11:13:08 +02:00 committed by GitHub
parent bdc3a16fa4
commit d6d416cacc
5 changed files with 408 additions and 114 deletions

View File

@ -0,0 +1,141 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId;
final class ClearScrollController implements Runnable {
private final DiscoveryNodes nodes;
private final SearchTransportService searchTransportService;
private final CountDown expectedOps;
private final ActionListener<ClearScrollResponse> listener;
private final AtomicBoolean hasFailed = new AtomicBoolean(false);
private final AtomicInteger freedSearchContexts = new AtomicInteger(0);
private final Logger logger;
private final Runnable runner;
ClearScrollController(ClearScrollRequest request, ActionListener<ClearScrollResponse> listener, DiscoveryNodes nodes, Logger logger,
SearchTransportService searchTransportService) {
this.nodes = nodes;
this.logger = logger;
this.searchTransportService = searchTransportService;
this.listener = listener;
List<String> scrollIds = request.getScrollIds();
final int expectedOps;
if (scrollIds.size() == 1 && "_all".equals(scrollIds.get(0))) {
expectedOps = nodes.getSize();
runner = this::cleanAllScrolls;
} else {
List<ScrollIdForNode> parsedScrollIds = new ArrayList<>();
for (String parsedScrollId : request.getScrollIds()) {
ScrollIdForNode[] context = parseScrollId(parsedScrollId).getContext();
for (ScrollIdForNode id : context) {
parsedScrollIds.add(id);
}
}
if (parsedScrollIds.isEmpty()) {
expectedOps = 0;
runner = () -> listener.onResponse(new ClearScrollResponse(true, 0));
} else {
expectedOps = parsedScrollIds.size();
runner = () -> cleanScrollIds(parsedScrollIds);
}
}
this.expectedOps = new CountDown(expectedOps);
}
@Override
public void run() {
runner.run();
}
void cleanAllScrolls() {
for (final DiscoveryNode node : nodes) {
try {
Transport.Connection connection = searchTransportService.getConnection(null, node);
searchTransportService.sendClearAllScrollContexts(connection, new ActionListener<TransportResponse>() {
@Override
public void onResponse(TransportResponse response) {
onFreedContext(true);
}
@Override
public void onFailure(Exception e) {
onFailedFreedContext(e, node);
}
});
} catch (Exception e) {
onFailedFreedContext(e, node);
}
}
}
void cleanScrollIds(List<ScrollIdForNode> parsedScrollIds) {
for (ScrollIdForNode target : parsedScrollIds) {
final DiscoveryNode node = nodes.get(target.getNode());
if (node == null) {
onFreedContext(false);
} else {
try {
Transport.Connection connection = searchTransportService.getConnection(null, node);
searchTransportService.sendFreeContext(connection, target.getScrollId(),
ActionListener.wrap(freed -> onFreedContext(freed.isFreed()),
e -> onFailedFreedContext(e, node)));
} catch (Exception e) {
onFailedFreedContext(e, node);
}
}
}
}
private void onFreedContext(boolean freed) {
if (freed) {
freedSearchContexts.incrementAndGet();
}
if (expectedOps.countDown()) {
boolean succeeded = hasFailed.get() == false;
listener.onResponse(new ClearScrollResponse(succeeded, freedSearchContexts.get()));
}
}
private void onFailedFreedContext(Throwable e, DiscoveryNode node) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Clear SC failed on node[{}]", node), e);
if (expectedOps.countDown()) {
listener.onResponse(new ClearScrollResponse(false, freedSearchContexts.get()));
} else {
hasFailed.set(true);
}
}
}

View File

@ -98,14 +98,14 @@ public class SearchTransportService extends AbstractComponent {
}, SearchFreeContextResponse::new)); }, SearchFreeContextResponse::new));
} }
public void sendFreeContext(DiscoveryNode node, long contextId, final ActionListener<SearchFreeContextResponse> listener) { public void sendFreeContext(Transport.Connection connection, long contextId, final ActionListener<SearchFreeContextResponse> listener) {
transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId), transportService.sendRequest(connection, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId),
new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new)); TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new));
} }
public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener<TransportResponse> listener) { public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE, transportService.sendRequest(connection, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE,
new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
} }
public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,

View File

@ -19,30 +19,16 @@
package org.elasticsearch.action.search; package org.elasticsearch.action.search;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId;
public class TransportClearScrollAction extends HandledTransportAction<ClearScrollRequest, ClearScrollResponse> { public class TransportClearScrollAction extends HandledTransportAction<ClearScrollRequest, ClearScrollResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
@ -53,105 +39,16 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
ClusterService clusterService, ActionFilters actionFilters, ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IndexNameExpressionResolver indexNameExpressionResolver,
SearchTransportService searchTransportService) { SearchTransportService searchTransportService) {
super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, ClearScrollRequest::new); super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
ClearScrollRequest::new);
this.clusterService = clusterService; this.clusterService = clusterService;
this.searchTransportService = searchTransportService; this.searchTransportService = searchTransportService;
} }
@Override @Override
protected void doExecute(ClearScrollRequest request, final ActionListener<ClearScrollResponse> listener) { protected void doExecute(ClearScrollRequest request, final ActionListener<ClearScrollResponse> listener) {
new Async(request, listener, clusterService.state()).run(); Runnable runnable = new ClearScrollController(request, listener, clusterService.state().nodes(), logger, searchTransportService);
} runnable.run();
private class Async {
final DiscoveryNodes nodes;
final CountDown expectedOps;
final List<ScrollIdForNode[]> contexts = new ArrayList<>();
final ActionListener<ClearScrollResponse> listener;
final AtomicReference<Throwable> expHolder;
final AtomicInteger numberOfFreedSearchContexts = new AtomicInteger(0);
private Async(ClearScrollRequest request, ActionListener<ClearScrollResponse> listener, ClusterState clusterState) {
int expectedOps = 0;
this.nodes = clusterState.nodes();
if (request.getScrollIds().size() == 1 && "_all".equals(request.getScrollIds().get(0))) {
expectedOps = nodes.getSize();
} else {
for (String parsedScrollId : request.getScrollIds()) {
ScrollIdForNode[] context = parseScrollId(parsedScrollId).getContext();
expectedOps += context.length;
this.contexts.add(context);
}
}
this.listener = listener;
this.expHolder = new AtomicReference<>();
this.expectedOps = new CountDown(expectedOps);
}
public void run() {
if (expectedOps.isCountedDown()) {
listener.onResponse(new ClearScrollResponse(true, 0));
return;
}
if (contexts.isEmpty()) {
for (final DiscoveryNode node : nodes) {
searchTransportService.sendClearAllScrollContexts(node, new ActionListener<TransportResponse>() {
@Override
public void onResponse(TransportResponse response) {
onFreedContext(true);
}
@Override
public void onFailure(Exception e) {
onFailedFreedContext(e, node);
}
});
}
} else {
for (ScrollIdForNode[] context : contexts) {
for (ScrollIdForNode target : context) {
final DiscoveryNode node = nodes.get(target.getNode());
if (node == null) {
onFreedContext(false);
continue;
}
searchTransportService.sendFreeContext(node, target.getScrollId(), new ActionListener<SearchTransportService.SearchFreeContextResponse>() {
@Override
public void onResponse(SearchTransportService.SearchFreeContextResponse freed) {
onFreedContext(freed.isFreed());
}
@Override
public void onFailure(Exception e) {
onFailedFreedContext(e, node);
}
});
}
}
}
}
void onFreedContext(boolean freed) {
if (freed) {
numberOfFreedSearchContexts.incrementAndGet();
}
if (expectedOps.countDown()) {
boolean succeeded = expHolder.get() == null;
listener.onResponse(new ClearScrollResponse(succeeded, numberOfFreedSearchContexts.get()));
}
}
void onFailedFreedContext(Throwable e, DiscoveryNode node) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Clear SC failed on node[{}]", node), e);
if (expectedOps.countDown()) {
listener.onResponse(new ClearScrollResponse(false, numberOfFreedSearchContexts.get()));
} else {
expHolder.set(e);
}
}
} }
} }

View File

@ -0,0 +1,256 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class ClearScrollControllerTests extends ESTestCase {
public void testClearAll() throws IOException, InterruptedException {
DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build();
CountDownLatch latch = new CountDownLatch(1);
ActionListener<ClearScrollResponse> listener = new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse clearScrollResponse) {
try {
assertEquals(3, clearScrollResponse.getNumFreed());
assertTrue(clearScrollResponse.isSucceeded());
} finally {
latch.countDown();
}
}
@Override
public void onFailure(Exception e) {
try {
throw new AssertionError(e);
} finally {
latch.countDown();
}
}
};
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) {
@Override
public void sendClearAllScrollContexts(Transport.Connection connection, ActionListener<TransportResponse> listener) {
nodesInvoked.add(connection.getNode());
Thread t = new Thread(() -> listener.onResponse(TransportResponse.Empty.INSTANCE)); // response is unused
t.start();
}
@Override
Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
return new SearchAsyncActionTests.MockConnection(node);
}
};
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.scrollIds(Arrays.asList("_all"));
ClearScrollController controller = new ClearScrollController(clearScrollRequest, listener,
nodes, logger, searchTransportService);
controller.run();
latch.await();
assertEquals(3, nodesInvoked.size());
Collections.sort(nodesInvoked, Comparator.comparing(DiscoveryNode::getId));
assertEquals(nodesInvoked, Arrays.asList(node1, node2, node3));
}
public void testClearScrollIds() throws IOException, InterruptedException {
DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT);
AtomicArray<SearchPhaseResult> array = new AtomicArray<>(3);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(1, node1);
testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(12, node2);
testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(42, node3);
testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null));
array.setOnce(0, testSearchPhaseResult1);
array.setOnce(1, testSearchPhaseResult2);
array.setOnce(2, testSearchPhaseResult3);
AtomicInteger numFreed = new AtomicInteger(0);
String scrollId = TransportSearchHelper.buildScrollId(array);
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build();
CountDownLatch latch = new CountDownLatch(1);
ActionListener<ClearScrollResponse> listener = new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse clearScrollResponse) {
try {
assertEquals(numFreed.get(), clearScrollResponse.getNumFreed());
assertTrue(clearScrollResponse.isSucceeded());
} finally {
latch.countDown();
}
}
@Override
public void onFailure(Exception e) {
try {
throw new AssertionError(e);
} finally {
latch.countDown();
}
}
};
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) {
@Override
public void sendFreeContext(Transport.Connection connection, long contextId,
ActionListener<SearchFreeContextResponse> listener) {
nodesInvoked.add(connection.getNode());
boolean freed = randomBoolean();
if (freed) {
numFreed.incrementAndGet();
}
Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(freed)));
t.start();
}
@Override
Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
return new SearchAsyncActionTests.MockConnection(node);
}
};
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.scrollIds(Arrays.asList(scrollId));
ClearScrollController controller = new ClearScrollController(clearScrollRequest, listener,
nodes, logger, searchTransportService);
controller.run();
latch.await();
assertEquals(3, nodesInvoked.size());
Collections.sort(nodesInvoked, Comparator.comparing(DiscoveryNode::getId));
assertEquals(nodesInvoked, Arrays.asList(node1, node2, node3));
}
public void testClearScrollIdsWithFailure() throws IOException, InterruptedException {
DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT);
AtomicArray<SearchPhaseResult> array = new AtomicArray<>(3);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(1, node1);
testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(12, node2);
testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(42, node3);
testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null));
array.setOnce(0, testSearchPhaseResult1);
array.setOnce(1, testSearchPhaseResult2);
array.setOnce(2, testSearchPhaseResult3);
AtomicInteger numFreed = new AtomicInteger(0);
AtomicInteger numFailures = new AtomicInteger(0);
AtomicInteger numConnectionFailures = new AtomicInteger(0);
String scrollId = TransportSearchHelper.buildScrollId(array);
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build();
CountDownLatch latch = new CountDownLatch(1);
ActionListener<ClearScrollResponse> listener = new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse clearScrollResponse) {
try {
assertEquals(numFreed.get(), clearScrollResponse.getNumFreed());
if (numFailures.get() > 0) {
assertFalse(clearScrollResponse.isSucceeded());
} else {
assertTrue(clearScrollResponse.isSucceeded());
}
} finally {
latch.countDown();
}
}
@Override
public void onFailure(Exception e) {
try {
throw new AssertionError(e);
} finally {
latch.countDown();
}
}
};
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) {
@Override
public void sendFreeContext(Transport.Connection connection, long contextId,
ActionListener<SearchFreeContextResponse> listener) {
nodesInvoked.add(connection.getNode());
boolean freed = randomBoolean();
boolean fail = randomBoolean();
Thread t = new Thread(() -> {
if (fail) {
numFailures.incrementAndGet();
listener.onFailure(new IllegalArgumentException("boom"));
} else {
if (freed) {
numFreed.incrementAndGet();
}
listener.onResponse(new SearchFreeContextResponse(freed));
}
});
t.start();
}
@Override
Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
if (randomBoolean()) {
numFailures.incrementAndGet();
numConnectionFailures.incrementAndGet();
throw new NodeNotConnectedException(node, "boom");
}
return new SearchAsyncActionTests.MockConnection(node);
}
};
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.scrollIds(Arrays.asList(scrollId));
ClearScrollController controller = new ClearScrollController(clearScrollRequest, listener,
nodes, logger, searchTransportService);
controller.run();
latch.await();
assertEquals(3 - numConnectionFailures.get(), nodesInvoked.size());
}
}

View File

@ -214,7 +214,7 @@ public class SearchAsyncActionTests extends ESTestCase {
} }
} }
public final class MockConnection implements Transport.Connection { public static final class MockConnection implements Transport.Connection {
private final DiscoveryNode node; private final DiscoveryNode node;