mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-26 06:46:10 +00:00
* Cleanup Various Uses of ActionListener * Use shorter `map`, `runAfter` or `wrap` where functionally equivalent to anonymous class * Use ActionRunnable where functionally equivalent
This commit is contained in:
parent
a4e6fb4dd2
commit
ecd033bea6
@ -203,27 +203,15 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
|
||||
request.getTaskId().toString());
|
||||
get.setParentTask(clusterService.localNode().getId(), thisTask.getId());
|
||||
|
||||
client.get(get, new ActionListener<GetResponse>() {
|
||||
@Override
|
||||
public void onResponse(GetResponse getResponse) {
|
||||
try {
|
||||
onGetFinishedTaskFromIndex(getResponse, listener);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
client.get(get, ActionListener.wrap(r -> onGetFinishedTaskFromIndex(r, listener), e -> {
|
||||
if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) {
|
||||
// We haven't yet created the index for the task results so it can't be found.
|
||||
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e,
|
||||
request.getTaskId()));
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) {
|
||||
// We haven't yet created the index for the task results so it can't be found.
|
||||
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e,
|
||||
request.getTaskId()));
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -119,23 +119,11 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
||||
TransportNodesSnapshotsStatus.Request nodesRequest =
|
||||
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()]))
|
||||
.snapshots(snapshots).timeout(request.masterNodeTimeout());
|
||||
transportNodesSnapshotsStatus.execute(nodesRequest, new ActionListener<TransportNodesSnapshotsStatus.NodesSnapshotStatus>() {
|
||||
@Override
|
||||
public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
|
||||
try {
|
||||
List<SnapshotsInProgress.Entry> currentSnapshots =
|
||||
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
|
||||
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
transportNodesSnapshotsStatus.execute(nodesRequest,
|
||||
ActionListener.map(
|
||||
listener, nodeSnapshotStatuses ->
|
||||
buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
|
||||
nodeSnapshotStatuses)));
|
||||
} else {
|
||||
// We don't have any in-progress shards, just return current stats
|
||||
listener.onResponse(buildResponse(request, currentSnapshots, null));
|
||||
|
@ -184,26 +184,13 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, UpgradeRequest request, final ActionListener<UpgradeResponse> listener) {
|
||||
ActionListener<UpgradeResponse> settingsUpdateListener = new ActionListener<UpgradeResponse>() {
|
||||
@Override
|
||||
public void onResponse(UpgradeResponse upgradeResponse) {
|
||||
try {
|
||||
if (upgradeResponse.versions().isEmpty()) {
|
||||
listener.onResponse(upgradeResponse);
|
||||
} else {
|
||||
updateSettings(upgradeResponse, listener);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
super.doExecute(task, request, ActionListener.wrap(upgradeResponse -> {
|
||||
if (upgradeResponse.versions().isEmpty()) {
|
||||
listener.onResponse(upgradeResponse);
|
||||
} else {
|
||||
updateSettings(upgradeResponse, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
};
|
||||
super.doExecute(task, request, settingsUpdateListener);
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
|
||||
|
@ -59,27 +59,20 @@ public final class BulkRequestHandler {
|
||||
semaphore.acquire();
|
||||
toRelease = semaphore::release;
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
|
||||
retry.withBackoff(consumer, bulkRequest, ActionListener.runAfter(new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse response) {
|
||||
try {
|
||||
listener.afterBulk(executionId, bulkRequest, response);
|
||||
} finally {
|
||||
semaphore.release();
|
||||
latch.countDown();
|
||||
}
|
||||
listener.afterBulk(executionId, bulkRequest, response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
listener.afterBulk(executionId, bulkRequest, e);
|
||||
} finally {
|
||||
semaphore.release();
|
||||
latch.countDown();
|
||||
}
|
||||
listener.afterBulk(executionId, bulkRequest, e);
|
||||
}
|
||||
});
|
||||
}, () -> {
|
||||
semaphore.release();
|
||||
latch.countDown();
|
||||
}));
|
||||
bulkRequestSetupSuccessful = true;
|
||||
if (concurrentRequests == 0) {
|
||||
latch.await();
|
||||
|
@ -22,7 +22,6 @@ package org.elasticsearch.action.ingest;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
@ -74,25 +73,13 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPip
|
||||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
|
||||
nodesInfoRequest.clear();
|
||||
nodesInfoRequest.ingest(true);
|
||||
client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<NodesInfoResponse>() {
|
||||
@Override
|
||||
public void onResponse(NodesInfoResponse nodeInfos) {
|
||||
try {
|
||||
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
|
||||
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
|
||||
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest());
|
||||
}
|
||||
ingestService.putPipeline(ingestInfos, request, listener);
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(nodeInfos -> {
|
||||
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
|
||||
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
|
||||
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
ingestService.putPipeline(ingestInfos, request, listener);
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.support;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -86,21 +87,16 @@ public final class ThreadedActionListener<Response> implements ActionListener<Re
|
||||
|
||||
@Override
|
||||
public void onResponse(final Response response) {
|
||||
threadPool.executor(executor).execute(new AbstractRunnable() {
|
||||
threadPool.executor(executor).execute(new ActionRunnable<Response>(listener) {
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return forceExecution;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
protected void doRun() {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.support.broadcast;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.NoShardAvailableActionException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
@ -36,7 +37,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
@ -287,45 +287,25 @@ public abstract class TransportBroadcastAction<
|
||||
|
||||
@Override
|
||||
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
|
||||
asyncShardOperation(request, task, new ActionListener<ShardResponse>() {
|
||||
@Override
|
||||
public void onResponse(ShardResponse response) {
|
||||
try {
|
||||
channel.sendResponse(response);
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
asyncShardOperation(request, task,
|
||||
ActionListener.wrap(channel::sendResponse, e -> {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (Exception e1) {
|
||||
logger.warn(() -> new ParameterizedMessage(
|
||||
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (Exception e1) {
|
||||
logger.warn(() -> new ParameterizedMessage(
|
||||
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
|
||||
}
|
||||
}
|
||||
});
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
protected void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
|
||||
transportService.getThreadPool().executor(getExecutor(request)).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
transportService.getThreadPool().executor(shardExecutor).execute(new ActionRunnable<ShardResponse>(listener) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
listener.onResponse(shardOperation(request, task));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected String getExecutor(ShardRequest request) {
|
||||
return shardExecutor;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -254,27 +254,16 @@ public abstract class TransportInstanceSingleOperationAction<
|
||||
|
||||
@Override
|
||||
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
|
||||
shardOperation(request, new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
try {
|
||||
channel.sendResponse(response);
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
shardOperation(request,
|
||||
ActionListener.wrap(channel::sendResponse, e -> {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (Exception inner) {
|
||||
inner.addSuppressed(e);
|
||||
logger.warn("failed to send response for get", inner);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (Exception inner) {
|
||||
inner.addSuppressed(e);
|
||||
logger.warn("failed to send response for get", inner);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.support.single.shard;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.NoShardAvailableActionException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ChannelActionListener;
|
||||
@ -40,7 +41,6 @@ import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.logging.LoggerMessageFormat;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
@ -107,12 +107,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
|
||||
protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;
|
||||
|
||||
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
|
||||
threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
threadPool.executor(getExecutor(request, shardId)).execute(new ActionRunnable<Response>(listener) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
listener.onResponse(shardOperation(request, shardId));
|
||||
|
@ -329,19 +329,8 @@ public abstract class TransportTasksAction<
|
||||
|
||||
@Override
|
||||
public void messageReceived(final NodeTaskRequest request, final TransportChannel channel, Task task) throws Exception {
|
||||
nodeOperation(request, new ActionListener<NodeTasksResponse>() {
|
||||
@Override
|
||||
public void onResponse(
|
||||
TransportTasksAction<OperationTask, TasksRequest, TasksResponse, TaskResponse>.NodeTasksResponse response) {
|
||||
try {
|
||||
channel.sendResponse(response);
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
nodeOperation(request, ActionListener.wrap(channel::sendResponse,
|
||||
e -> {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
@ -349,11 +338,10 @@ public abstract class TransportTasksAction<
|
||||
logger.warn("Failed to send failure", e1);
|
||||
}
|
||||
}
|
||||
});
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private class NodeTaskRequest extends TransportRequest {
|
||||
private TasksRequest tasksRequest;
|
||||
|
||||
|
@ -25,6 +25,7 @@ import org.apache.lucene.search.FieldDoc;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.OriginalIndices;
|
||||
import org.elasticsearch.action.search.SearchTask;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
@ -39,7 +40,6 @@ import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
@ -302,21 +302,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||
}
|
||||
|
||||
public void executeDfsPhase(ShardSearchRequest request, SearchTask task, ActionListener<SearchPhaseResult> listener) {
|
||||
rewriteShardRequest(request, new ActionListener<ShardSearchRequest>() {
|
||||
@Override
|
||||
public void onResponse(ShardSearchRequest request) {
|
||||
try {
|
||||
listener.onResponse(executeDfsPhase(request, task));
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
rewriteShardRequest(request, ActionListener.map(listener, r -> executeDfsPhase(r, task)));
|
||||
}
|
||||
|
||||
private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException {
|
||||
@ -351,30 +337,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||
}
|
||||
|
||||
public void executeQueryPhase(ShardSearchRequest request, SearchTask task, ActionListener<SearchPhaseResult> listener) {
|
||||
rewriteShardRequest(request, new ActionListener<ShardSearchRequest>() {
|
||||
@Override
|
||||
public void onResponse(ShardSearchRequest request) {
|
||||
try {
|
||||
listener.onResponse(executeQueryPhase(request, task));
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
rewriteShardRequest(request, ActionListener.map(listener, r -> executeQueryPhase(r, task)));
|
||||
}
|
||||
|
||||
private <T> void runAsync(long id, Supplier<T> executable, ActionListener<T> listener) {
|
||||
getExecutor(id).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
getExecutor(id).execute(new ActionRunnable<T>(listener) {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
listener.onResponse(executable.get());
|
||||
@ -1058,12 +1025,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||
ActionListener<Rewriteable> actionListener = ActionListener.wrap(r ->
|
||||
// now we need to check if there is a pending refresh and register
|
||||
shard.awaitShardSearchActive(b ->
|
||||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
executor.execute(new ActionRunnable<ShardSearchRequest>(listener) {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
listener.onResponse(request);
|
||||
|
@ -84,10 +84,7 @@ final class TransportKeepAlive implements Closeable {
|
||||
|
||||
for (TcpChannel channel : nodeChannels) {
|
||||
scheduledPing.addChannel(channel);
|
||||
|
||||
channel.addCloseListener(ActionListener.wrap(() -> {
|
||||
scheduledPing.removeChannel(channel);
|
||||
}));
|
||||
channel.addCloseListener(ActionListener.wrap(() -> scheduledPing.removeChannel(channel)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,19 +65,17 @@ public class RejectionActionIT extends ESIntegTestCase {
|
||||
client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(QueryBuilders.matchQuery("field", "1"))
|
||||
.execute(new ActionListener<SearchResponse>() {
|
||||
.execute(new LatchedActionListener<>(new ActionListener<SearchResponse>() {
|
||||
@Override
|
||||
public void onResponse(SearchResponse searchResponse) {
|
||||
responses.add(searchResponse);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
responses.add(e);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}, latch));
|
||||
}
|
||||
latch.await();
|
||||
|
||||
|
@ -470,17 +470,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
|
||||
connectNodes(testNodes);
|
||||
CountDownLatch checkLatch = new CountDownLatch(1);
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
Task task = startBlockingTestNodesAction(checkLatch, new ActionListener<NodesResponse>() {
|
||||
@Override
|
||||
public void onResponse(NodesResponse nodeResponses) {
|
||||
responseLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
responseLatch.countDown();
|
||||
}
|
||||
});
|
||||
Task task = startBlockingTestNodesAction(checkLatch, ActionListener.wrap(responseLatch::countDown));
|
||||
String actionName = "internal:testAction"; // only pick the main action
|
||||
|
||||
// Try to cancel main task using action name
|
||||
|
@ -20,6 +20,7 @@ package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
@ -42,32 +43,24 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class ClearScrollControllerTests extends ESTestCase {
|
||||
|
||||
public void testClearAll() throws IOException, InterruptedException {
|
||||
public void testClearAll() throws InterruptedException {
|
||||
DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
ActionListener<ClearScrollResponse> listener = new ActionListener<ClearScrollResponse>() {
|
||||
ActionListener<ClearScrollResponse> listener = new LatchedActionListener<>(new ActionListener<ClearScrollResponse>() {
|
||||
@Override
|
||||
public void onResponse(ClearScrollResponse clearScrollResponse) {
|
||||
try {
|
||||
assertEquals(3, clearScrollResponse.getNumFreed());
|
||||
assertTrue(clearScrollResponse.isSucceeded());
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
assertEquals(3, clearScrollResponse.getNumFreed());
|
||||
assertTrue(clearScrollResponse.isSucceeded());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
throw new AssertionError(e);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
};
|
||||
}, latch);
|
||||
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
|
||||
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
|
||||
@Override
|
||||
@ -112,27 +105,18 @@ public class ClearScrollControllerTests extends ESTestCase {
|
||||
String scrollId = TransportSearchHelper.buildScrollId(array);
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
ActionListener<ClearScrollResponse> listener = new ActionListener<ClearScrollResponse>() {
|
||||
ActionListener<ClearScrollResponse> listener = new LatchedActionListener<>(new ActionListener<ClearScrollResponse>() {
|
||||
@Override
|
||||
public void onResponse(ClearScrollResponse clearScrollResponse) {
|
||||
try {
|
||||
assertEquals(numFreed.get(), clearScrollResponse.getNumFreed());
|
||||
assertTrue(clearScrollResponse.isSucceeded());
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
assertEquals(numFreed.get(), clearScrollResponse.getNumFreed());
|
||||
assertTrue(clearScrollResponse.isSucceeded());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
throw new AssertionError(e);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
};
|
||||
}, latch);
|
||||
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
|
||||
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
|
||||
|
||||
@ -185,32 +169,22 @@ public class ClearScrollControllerTests extends ESTestCase {
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
ActionListener<ClearScrollResponse> listener = new ActionListener<ClearScrollResponse>() {
|
||||
ActionListener<ClearScrollResponse> listener = new LatchedActionListener<>(new ActionListener<ClearScrollResponse>() {
|
||||
@Override
|
||||
public void onResponse(ClearScrollResponse clearScrollResponse) {
|
||||
try {
|
||||
assertEquals(numFreed.get(), clearScrollResponse.getNumFreed());
|
||||
if (numFailures.get() > 0) {
|
||||
assertFalse(clearScrollResponse.isSucceeded());
|
||||
} else {
|
||||
assertTrue(clearScrollResponse.isSucceeded());
|
||||
}
|
||||
|
||||
} finally {
|
||||
latch.countDown();
|
||||
assertEquals(numFreed.get(), clearScrollResponse.getNumFreed());
|
||||
if (numFailures.get() > 0) {
|
||||
assertFalse(clearScrollResponse.isSucceeded());
|
||||
} else {
|
||||
assertTrue(clearScrollResponse.isSucceeded());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
throw new AssertionError(e);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
};
|
||||
}, latch);
|
||||
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
|
||||
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
|
||||
|
||||
|
@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
@ -65,7 +66,7 @@ public class TransportActionFilterChainTests extends ESTestCase {
|
||||
terminate(threadPool);
|
||||
}
|
||||
|
||||
public void testActionFiltersRequest() throws ExecutionException, InterruptedException {
|
||||
public void testActionFiltersRequest() throws InterruptedException {
|
||||
int numFilters = randomInt(10);
|
||||
Set<Integer> orders = new HashSet<>(numFilters);
|
||||
while (orders.size() < numFilters) {
|
||||
@ -139,7 +140,7 @@ public class TransportActionFilterChainTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testTooManyContinueProcessingRequest() throws ExecutionException, InterruptedException {
|
||||
public void testTooManyContinueProcessingRequest() throws InterruptedException {
|
||||
final int additionalContinueCount = randomInt(10);
|
||||
|
||||
RequestTestFilter testFilter = new RequestTestFilter(randomInt(), new RequestCallback() {
|
||||
@ -169,19 +170,17 @@ public class TransportActionFilterChainTests extends ESTestCase {
|
||||
final AtomicInteger responses = new AtomicInteger();
|
||||
final List<Throwable> failures = new CopyOnWriteArrayList<>();
|
||||
|
||||
transportAction.execute(new TestRequest(), new ActionListener<TestResponse>() {
|
||||
transportAction.execute(new TestRequest(), new LatchedActionListener<>(new ActionListener<TestResponse>() {
|
||||
@Override
|
||||
public void onResponse(TestResponse testResponse) {
|
||||
responses.incrementAndGet();
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failures.add(e);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}, latch));
|
||||
|
||||
if (!latch.await(10, TimeUnit.SECONDS)) {
|
||||
fail("timeout waiting for the filter to notify the listener as many times as expected");
|
||||
|
@ -369,17 +369,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
||||
CountDownLatch completionLatch = new CountDownLatch(1);
|
||||
threadPool.generic().execute(() -> {
|
||||
waitForBarrier.run();
|
||||
replicaResult.respond(new ActionListener<TransportResponse.Empty>() {
|
||||
@Override
|
||||
public void onResponse(TransportResponse.Empty empty) {
|
||||
completionLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
completionLatch.countDown();
|
||||
}
|
||||
});
|
||||
replicaResult.respond(ActionListener.wrap(completionLatch::countDown));
|
||||
});
|
||||
if (randomBoolean()) {
|
||||
threadPool.generic().execute(() -> {
|
||||
|
@ -23,9 +23,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.persistent.PersistentTaskParams;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
|
||||
@ -72,17 +70,7 @@ public class EnableAssignmentDeciderIT extends ESIntegTestCase {
|
||||
for (int i = 0; i < numberOfTasks; i++) {
|
||||
PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class);
|
||||
service.sendStartRequest("task_" + i, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)),
|
||||
new ActionListener<PersistentTask<PersistentTaskParams>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<PersistentTaskParams> task) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
ActionListener.wrap(latch::countDown));
|
||||
}
|
||||
latch.await();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user