Core: Convert TransportAction.execute uses to client calls (#31487)

This commit converts some of the existing calls to
TransportAction.execute to use the equivalent client method for the
desired action.
This commit is contained in:
Ryan Ernst 2018-06-21 07:59:55 -07:00 committed by GitHub
parent da69ab28c7
commit 0a324b9943
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 83 additions and 120 deletions

View File

@ -23,9 +23,9 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.TransportMultiSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -42,16 +42,16 @@ public class TransportMultiSearchTemplateAction extends HandledTransportAction<M
private final ScriptService scriptService;
private final NamedXContentRegistry xContentRegistry;
private final TransportMultiSearchAction multiSearchAction;
private final NodeClient client;
@Inject
public TransportMultiSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, TransportMultiSearchAction multiSearchAction) {
NamedXContentRegistry xContentRegistry, NodeClient client) {
super(settings, MultiSearchTemplateAction.NAME, threadPool, transportService, actionFilters, MultiSearchTemplateRequest::new);
this.scriptService = scriptService;
this.xContentRegistry = xContentRegistry;
this.multiSearchAction = multiSearchAction;
this.client = client;
}
@Override
@ -81,7 +81,7 @@ public class TransportMultiSearchTemplateAction extends HandledTransportAction<M
}
}
multiSearchAction.execute(multiSearchRequest, ActionListener.wrap(r -> {
client.multiSearch(multiSearchRequest, ActionListener.wrap(r -> {
for (int i = 0; i < r.getResponses().length; i++) {
MultiSearchResponse.Item item = r.getResponses()[i];
int originalSlot = originalSlots.get(i);

View File

@ -22,9 +22,9 @@ package org.elasticsearch.script.mustache;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -50,20 +50,18 @@ public class TransportSearchTemplateAction extends HandledTransportAction<Search
private static final String TEMPLATE_LANG = MustacheScriptEngine.NAME;
private final ScriptService scriptService;
private final TransportSearchAction searchAction;
private final NamedXContentRegistry xContentRegistry;
private final NodeClient client;
@Inject
public TransportSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters,
ScriptService scriptService,
TransportSearchAction searchAction,
NamedXContentRegistry xContentRegistry) {
ActionFilters actionFilters, ScriptService scriptService, NamedXContentRegistry xContentRegistry,
NodeClient client) {
super(settings, SearchTemplateAction.NAME, threadPool, transportService, actionFilters,
(Supplier<SearchTemplateRequest>) SearchTemplateRequest::new);
this.scriptService = scriptService;
this.searchAction = searchAction;
this.xContentRegistry = xContentRegistry;
this.client = client;
}
@Override
@ -72,7 +70,7 @@ public class TransportSearchTemplateAction extends HandledTransportAction<Search
try {
SearchRequest searchRequest = convert(request, response, scriptService, xContentRegistry);
if (searchRequest != null) {
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {

View File

@ -55,8 +55,7 @@ public class CreateIndexResponse extends ShardsAcknowledgedResponse {
private String index;
protected CreateIndexResponse() {
}
public CreateIndexResponse() {}
protected CreateIndexResponse(boolean acknowledged, boolean shardsAcknowledged, String index) {
super(acknowledged, shardsAcknowledged);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.PrimaryMissingActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -58,16 +59,15 @@ import java.util.Set;
public class TransportUpgradeAction extends TransportBroadcastByNodeAction<UpgradeRequest, UpgradeResponse, ShardUpgradeResult> {
private final IndicesService indicesService;
private final TransportUpgradeSettingsAction upgradeSettingsAction;
private final NodeClient client;
@Inject
public TransportUpgradeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, TransportUpgradeSettingsAction upgradeSettingsAction) {
IndexNameExpressionResolver indexNameExpressionResolver, NodeClient client) {
super(settings, UpgradeAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpgradeRequest::new, ThreadPool.Names.FORCE_MERGE);
this.indicesService = indicesService;
this.upgradeSettingsAction = upgradeSettingsAction;
this.client = client;
}
@Override
@ -205,7 +205,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions());
upgradeSettingsAction.execute(upgradeSettingsRequest, new ActionListener<UpgradeSettingsResponse>() {
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener<UpgradeSettingsResponse>() {
@Override
public void onResponse(UpgradeSettingsResponse updateSettingsResponse) {
listener.onResponse(upgradeResponse);

View File

@ -30,7 +30,6 @@ import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
@ -38,6 +37,7 @@ import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -88,27 +88,24 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final ClusterService clusterService;
private final IngestService ingestService;
private final TransportShardBulkAction shardBulkAction;
private final TransportCreateIndexAction createIndexAction;
private final LongSupplier relativeTimeProvider;
private final IngestActionForwarder ingestForwarder;
private final NodeClient client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) {
this(settings, threadPool, transportService, clusterService, ingestService,
shardBulkAction, createIndexAction,
actionFilters, indexNameExpressionResolver,
autoCreateIndex,
System::nanoTime);
this(settings, threadPool, transportService, clusterService, ingestService, shardBulkAction, client, actionFilters,
indexNameExpressionResolver, autoCreateIndex, System::nanoTime);
}
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
super(settings, BulkAction.NAME, threadPool, transportService, actionFilters, BulkRequest::new);
@ -116,10 +113,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
this.clusterService = clusterService;
this.ingestService = ingestService;
this.shardBulkAction = shardBulkAction;
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
this.relativeTimeProvider = relativeTimeProvider;
this.ingestForwarder = new IngestActionForwarder(transportService);
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
clusterService.addStateApplier(this.ingestForwarder);
}
@ -224,7 +221,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
createIndexRequest.index(index);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(timeout);
createIndexAction.execute(createIndexRequest, listener);
client.admin().indices().create(createIndexRequest, listener);
}
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest request, String index, Exception e) {

View File

@ -23,9 +23,9 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -47,16 +47,16 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPip
private final PipelineStore pipelineStore;
private final ClusterService clusterService;
private final TransportNodesInfoAction nodesInfoAction;
private final NodeClient client;
@Inject
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService,
TransportNodesInfoAction nodesInfoAction) {
NodeClient client) {
super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
this.clusterService = clusterService;
this.nodesInfoAction = nodesInfoAction;
this.client = client;
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
}
@ -75,7 +75,7 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPip
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.ingest(true);
nodesInfoAction.execute(nodesInfoRequest, new ActionListener<NodesInfoResponse>() {
client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<NodesInfoResponse>() {
@Override
public void onResponse(NodesInfoResponse nodeInfos) {
try {

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.service.ClusterService;
@ -43,27 +43,27 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
private final int availableProcessors;
private final ClusterService clusterService;
private final TransportAction<SearchRequest, SearchResponse> searchAction;
private final LongSupplier relativeTimeProvider;
private final NodeClient client;
@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, TransportSearchAction searchAction, ActionFilters actionFilters) {
ClusterService clusterService, ActionFilters actionFilters, NodeClient client) {
super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = EsExecutors.numberOfProcessors(settings);
this.relativeTimeProvider = System::nanoTime;
this.client = client;
}
TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
int availableProcessors, LongSupplier relativeTimeProvider) {
ClusterService clusterService, int availableProcessors,
LongSupplier relativeTimeProvider, NodeClient client) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = availableProcessors;
this.relativeTimeProvider = relativeTimeProvider;
this.client = client;
}
@Override
@ -141,7 +141,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
* when we handle the response rather than going recursive, we fork to another thread, otherwise we recurse.
*/
final Thread thread = Thread.currentThread();
searchAction.execute(request.request, new ActionListener<SearchResponse>() {
client.search(request.request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
handleResponse(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null));

View File

@ -24,8 +24,6 @@ import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
@ -34,6 +32,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
@ -66,22 +65,21 @@ import static org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction.w
public class TransportUpdateAction extends TransportInstanceSingleOperationAction<UpdateRequest, UpdateResponse> {
private final TransportBulkAction bulkAction;
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
private final UpdateHelper updateHelper;
private final IndicesService indicesService;
private final NodeClient client;
@Inject
public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
TransportBulkAction bulkAction, TransportCreateIndexAction createIndexAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService, AutoCreateIndex autoCreateIndex) {
UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService,
AutoCreateIndex autoCreateIndex, NodeClient client) {
super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpdateRequest::new);
this.bulkAction = bulkAction;
this.createIndexAction = createIndexAction;
this.updateHelper = updateHelper;
this.indicesService = indicesService;
this.autoCreateIndex = autoCreateIndex;
this.client = client;
}
@Override
@ -116,7 +114,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
protected void doExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
client.admin().indices().create(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener);
@ -177,7 +175,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
IndexRequest upsertRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
bulkAction.execute(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse(
client.bulk(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse(
ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
@ -197,7 +195,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
IndexRequest indexRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
bulkAction.execute(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(
client.bulk(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(
ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
@ -208,7 +206,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
break;
case DELETED:
DeleteRequest deleteRequest = result.action();
bulkAction.execute(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse(
client.bulk(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse(
ActionListener.<DeleteResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));

View File

@ -25,7 +25,6 @@ import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
@ -69,15 +68,11 @@ public class TaskResultsService extends AbstractComponent {
private final ClusterService clusterService;
private final TransportCreateIndexAction createIndexAction;
@Inject
public TaskResultsService(Settings settings, Client client, ClusterService clusterService,
TransportCreateIndexAction createIndexAction) {
public TaskResultsService(Settings settings, Client client, ClusterService clusterService) {
super(settings);
this.client = client;
this.clusterService = clusterService;
this.createIndexAction = createIndexAction;
}
public void storeResult(TaskResult taskResult, ActionListener<Void> listener) {
@ -91,7 +86,7 @@ public class TaskResultsService extends AbstractComponent {
createIndexRequest.mapping(TASK_TYPE, taskResultIndexMapping(), XContentType.JSON);
createIndexRequest.cause("auto(task api)");
createIndexAction.execute(null, createIndexRequest, new ActionListener<CreateIndexResponse>() {
client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
doStoreResult(taskResult, listener);

View File

@ -21,16 +21,17 @@
package org.elasticsearch.action.bulk;
import org.apache.lucene.util.Constants;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
@ -99,14 +100,13 @@ public class TransportBulkActionTookTests extends ESTestCase {
IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY);
ActionFilters actionFilters = new ActionFilters(new HashSet<>());
TransportCreateIndexAction createIndexAction = new TransportCreateIndexAction(
Settings.EMPTY,
transportService,
clusterService,
threadPool,
null,
actionFilters,
resolver);
NodeClient client = new NodeClient(Settings.EMPTY, threadPool) {
@Override
public <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
listener.onResponse((Response)new CreateIndexResponse());
}
};
if (controlled) {
@ -116,7 +116,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
transportService,
clusterService,
null,
createIndexAction,
client,
actionFilters,
resolver,
null,
@ -141,7 +141,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
transportService,
clusterService,
null,
createIndexAction,
client,
actionFilters,
resolver,
null,
@ -223,7 +223,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
TransportService transportService,
ClusterService clusterService,
TransportShardBulkAction shardBulkAction,
TransportCreateIndexAction createIndexAction,
NodeClient client,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
@ -235,7 +235,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
clusterService,
null,
shardBulkAction,
createIndexAction,
client,
actionFilters,
indexNameExpressionResolver,
autoCreateIndex,
@ -253,24 +253,4 @@ public class TransportBulkActionTookTests extends ESTestCase {
}
}
static class TestTransportCreateIndexAction extends TransportCreateIndexAction {
TestTransportCreateIndexAction(
Settings settings,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
MetaDataCreateIndexService createIndexService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
}
@Override
protected void doExecute(Task task, CreateIndexRequest request, ActionListener<CreateIndexResponse> listener) {
listener.onResponse(newResponse());
}
}
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -148,10 +148,9 @@ public class MultiSearchActionTookTests extends ESTestCase {
final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0));
final Set<SearchRequest> requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>()));
TransportAction<SearchRequest, SearchResponse> searchAction = new TransportAction<SearchRequest, SearchResponse>(Settings.EMPTY,
"action", threadPool, actionFilters, taskManager) {
NodeClient client = new NodeClient(settings, threadPool) {
@Override
protected void doExecute(SearchRequest request, ActionListener<SearchResponse> listener) {
public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
requests.add(request);
commonExecutor.execute(() -> {
counter.decrementAndGet();
@ -161,8 +160,8 @@ public class MultiSearchActionTookTests extends ESTestCase {
};
if (controlledClock) {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction,
availableProcessors, expected::get) {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, availableProcessors,
expected::get, client) {
@Override
void executeSearch(final Queue<SearchRequestSlot> requests, final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter, final ActionListener<MultiSearchResponse> listener, long startTimeInNanos) {
@ -171,9 +170,8 @@ public class MultiSearchActionTookTests extends ESTestCase {
}
};
} else {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction,
availableProcessors, System::nanoTime) {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService,
availableProcessors, System::nanoTime, client) {
@Override
void executeSearch(final Queue<SearchRequestSlot> requests, final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter, final ActionListener<MultiSearchResponse> listener, long startTimeInNanos) {

View File

@ -24,7 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -107,15 +107,14 @@ public class TransportMultiSearchActionTests extends ESTestCase {
final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0));
final ExecutorService rarelyExecutor = threadPool.executor(threadPoolNames.get(1));
final Set<SearchRequest> requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>()));
TransportAction<SearchRequest, SearchResponse> searchAction = new TransportAction<SearchRequest, SearchResponse>
(Settings.EMPTY, "action", threadPool, actionFilters, taskManager) {
NodeClient client = new NodeClient(settings, threadPool) {
@Override
protected void doExecute(SearchRequest request, ActionListener<SearchResponse> listener) {
public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
requests.add(request);
int currentConcurrentSearches = counter.incrementAndGet();
if (currentConcurrentSearches > maxAllowedConcurrentSearches) {
errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches +
"] is higher than is allowed [" + maxAllowedConcurrentSearches + "]"));
"] is higher than is allowed [" + maxAllowedConcurrentSearches + "]"));
}
final ExecutorService executorService = rarely() ? rarelyExecutor : commonExecutor;
executorService.execute(() -> {
@ -126,8 +125,7 @@ public class TransportMultiSearchActionTests extends ESTestCase {
};
TransportMultiSearchAction action =
new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, 10,
System::nanoTime);
new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, 10, System::nanoTime, client);
// Execute the multi search api and fail if we find an error after executing:
try {

View File

@ -13,9 +13,9 @@ import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -65,7 +65,7 @@ import java.util.function.Supplier;
*/
public class TransportGraphExploreAction extends HandledTransportAction<GraphExploreRequest, GraphExploreResponse> {
private final TransportSearchAction searchAction;
private final NodeClient client;
protected final XPackLicenseState licenseState;
static class VertexPriorityQueue extends PriorityQueue<Vertex> {
@ -82,12 +82,12 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
}
@Inject
public TransportGraphExploreAction(Settings settings, ThreadPool threadPool, TransportSearchAction transportSearchAction,
public TransportGraphExploreAction(Settings settings, ThreadPool threadPool, NodeClient client,
TransportService transportService, ActionFilters actionFilters,
XPackLicenseState licenseState) {
super(settings, GraphExploreAction.NAME, threadPool, transportService, actionFilters,
(Supplier<GraphExploreRequest>)GraphExploreRequest::new);
this.searchAction = transportSearchAction;
this.client = client;
this.licenseState = licenseState;
}
@ -313,7 +313,7 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
// System.out.println(source);
logger.trace("executing expansion graph search request");
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
// System.out.println(searchResponse);
@ -660,7 +660,7 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
searchRequest.source(source);
// System.out.println(source);
logger.trace("executing initial graph search request");
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
addShardFailures(searchResponse.getShardFailures());