Automatically thread client based action listeners

Today, we rely on the user to set request listener threads to true when they are on the client side in order not to block the IO threads on heavy operations. This proves to be very trappy for users, and end up creating problems that are very hard to debug.
Instead, we can do the right thing, and automatically thread listeners that are used from the client when the client is a node client or a transport client.
This change also removes the ability to set request level listener threading, in the effort of simplifying the code path and reasoning around when something is threaded and when it is not.
closes #10940
This commit is contained in:
Shay Banon 2015-05-03 23:58:39 +02:00
parent 23ac32e616
commit b87d360e79
85 changed files with 300 additions and 332 deletions

View File

@ -30,8 +30,6 @@ import java.io.IOException;
*/ */
public abstract class ActionRequest<T extends ActionRequest> extends TransportRequest { public abstract class ActionRequest<T extends ActionRequest> extends TransportRequest {
private boolean listenerThreaded = false;
protected ActionRequest() { protected ActionRequest() {
super(); super();
} }
@ -43,25 +41,6 @@ public abstract class ActionRequest<T extends ActionRequest> extends TransportRe
//this.listenerThreaded = request.listenerThreaded(); //this.listenerThreaded = request.listenerThreaded();
} }
/**
* Should the response listener be executed on a thread or not.
* <p/>
* <p>When not executing on a thread, it will either be executed on the calling thread, or
* on an expensive, IO based, thread.
*/
public final boolean listenerThreaded() {
return this.listenerThreaded;
}
/**
* Sets if the response listener be executed on a thread or not.
*/
@SuppressWarnings("unchecked")
public final T listenerThreaded(boolean listenerThreaded) {
this.listenerThreaded = listenerThreaded;
return (T) this;
}
public abstract ActionRequestValidationException validate(); public abstract ActionRequestValidationException validate();
@Override @Override

View File

@ -48,12 +48,6 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon
return this.request; return this.request;
} }
@SuppressWarnings("unchecked")
public final RequestBuilder setListenerThreaded(boolean listenerThreaded) {
request.listenerThreaded(listenerThreaded);
return (RequestBuilder) this;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final RequestBuilder putHeader(String key, Object value) { public final RequestBuilder putHeader(String key, Object value) {
request.putHeader(key, value); request.putHeader(key, value);
@ -61,7 +55,7 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon
} }
public ListenableActionFuture<Response> execute() { public ListenableActionFuture<Response> execute() {
PlainListenableActionFuture<Response> future = new PlainListenableActionFuture<>(request.listenerThreaded(), threadPool); PlainListenableActionFuture<Response> future = new PlainListenableActionFuture<>(threadPool);
execute(future); execute(future);
return future; return future;
} }

View File

@ -24,20 +24,15 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.*;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
/** /**
* A generic proxy that will execute the given action against a specific node. * A generic proxy that will execute the given action against a specific node.
*/ */
public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent { public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
protected final TransportService transportService; private final TransportService transportService;
private final GenericAction<Request, Response> action; private final GenericAction<Request, Response> action;
private final TransportRequestOptions transportOptions; private final TransportRequestOptions transportOptions;
@Inject @Inject
@ -48,36 +43,17 @@ public class TransportActionNodeProxy<Request extends ActionRequest, Response ex
this.transportOptions = action.transportOptions(settings); this.transportOptions = action.transportOptions(settings);
} }
public void execute(DiscoveryNode node, final Request request, final ActionListener<Response> listener) { public void execute(final DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate(); ActionRequestValidationException validationException = request.validate();
if (validationException != null) { if (validationException != null) {
listener.onFailure(validationException); listener.onFailure(validationException);
return; return;
} }
transportService.sendRequest(node, action.name(), request, transportOptions, new BaseTransportResponseHandler<Response>() { transportService.sendRequest(node, action.name(), request, transportOptions, new ActionListenerResponseHandler<Response>(listener) {
@Override @Override
public Response newInstance() { public Response newInstance() {
return action.newResponse(); return action.newResponse();
} }
@Override
public String executor() {
if (request.listenerThreaded()) {
return ThreadPool.Names.LISTENER;
}
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
}); });
} }
} }

View File

@ -64,8 +64,6 @@ public class TransportGetFieldMappingsAction extends HandledTransportAction<GetF
boolean probablySingleFieldRequest = concreteIndices.length == 1 && request.types().length == 1 && request.fields().length == 1; boolean probablySingleFieldRequest = concreteIndices.length == 1 && request.types().length == 1 && request.fields().length == 1;
for (final String index : concreteIndices) { for (final String index : concreteIndices) {
GetFieldMappingsIndexRequest shardRequest = new GetFieldMappingsIndexRequest(request, index, probablySingleFieldRequest); GetFieldMappingsIndexRequest shardRequest = new GetFieldMappingsIndexRequest(request, index, probablySingleFieldRequest);
// no threading needed, all is done on the index replication one
shardRequest.listenerThreaded(false);
shardAction.execute(shardRequest, new ActionListener<GetFieldMappingsResponse>() { shardAction.execute(shardRequest, new ActionListener<GetFieldMappingsResponse>() {
@Override @Override
public void onResponse(GetFieldMappingsResponse result) { public void onResponse(GetFieldMappingsResponse result) {

View File

@ -119,7 +119,6 @@ public class TransportMoreLikeThisAction extends HandledTransportAction<MoreLike
.type(request.type()) .type(request.type())
.id(request.id()) .id(request.id())
.routing(request.routing()) .routing(request.routing())
.listenerThreaded(true)
.operationThreaded(true); .operationThreaded(true);
getAction.execute(getRequest, new ActionListener<GetResponse>() { getAction.execute(getRequest, new ActionListener<GetResponse>() {
@ -197,8 +196,7 @@ public class TransportMoreLikeThisAction extends HandledTransportAction<MoreLike
SearchRequest searchRequest = new SearchRequest(request).indices(searchIndices) SearchRequest searchRequest = new SearchRequest(request).indices(searchIndices)
.types(searchTypes) .types(searchTypes)
.searchType(request.searchType()) .searchType(request.searchType())
.scroll(request.searchScroll()) .scroll(request.searchScroll());
.listenerThreaded(request.listenerThreaded());
SearchSourceBuilder extraSource = searchSource().query(boolBuilder); SearchSourceBuilder extraSource = searchSource().query(boolBuilder);
if (request.searchFrom() != 0) { if (request.searchFrom() != 0) {

View File

@ -38,14 +38,6 @@ public class SearchScrollRequestBuilder extends ActionRequestBuilder<SearchScrol
super(client, new SearchScrollRequest(scrollId)); super(client, new SearchScrollRequest(scrollId));
} }
/**
* Should the listener be called on a separate thread if needed.
*/
public SearchScrollRequestBuilder listenerThreaded(boolean threadedListener) {
request.listenerThreaded(threadedListener);
return this;
}
/** /**
* The scroll id to use to continue scrolling. * The scroll id to use to continue scrolling.
*/ */

View File

@ -20,10 +20,10 @@
package org.elasticsearch.action.support; package org.elasticsearch.action.support;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.List; import java.util.List;
@ -33,20 +33,16 @@ import java.util.List;
*/ */
public abstract class AbstractListenableActionFuture<T, L> extends AdapterActionFuture<T, L> implements ListenableActionFuture<T> { public abstract class AbstractListenableActionFuture<T, L> extends AdapterActionFuture<T, L> implements ListenableActionFuture<T> {
final boolean listenerThreaded; private final static ESLogger logger = Loggers.getLogger(AbstractListenableActionFuture.class);
final ThreadPool threadPool; final ThreadPool threadPool;
volatile Object listeners; volatile Object listeners;
boolean executedListeners = false; boolean executedListeners = false;
protected AbstractListenableActionFuture(boolean listenerThreaded, ThreadPool threadPool) { protected AbstractListenableActionFuture(ThreadPool threadPool) {
this.listenerThreaded = listenerThreaded;
this.threadPool = threadPool; this.threadPool = threadPool;
} }
public boolean listenerThreaded() {
return false; // we control execution of the listener
}
public ThreadPool threadPool() { public ThreadPool threadPool() {
return threadPool; return threadPool;
} }
@ -57,6 +53,7 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
} }
public void internalAddListener(ActionListener<T> listener) { public void internalAddListener(ActionListener<T> listener) {
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
boolean executeImmediate = false; boolean executeImmediate = false;
synchronized (this) { synchronized (this) {
if (executedListeners) { if (executedListeners) {
@ -101,27 +98,10 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
} }
private void executeListener(final ActionListener<T> listener) { private void executeListener(final ActionListener<T> listener) {
if (listenerThreaded) { try {
try { listener.onResponse(actionGet());
threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() { } catch (Throwable e) {
@Override listener.onFailure(e);
public void run() {
try {
listener.onResponse(actionGet());
} catch (ElasticsearchException e) {
listener.onFailure(e);
}
}
});
} catch (EsRejectedExecutionException e) {
listener.onFailure(e);
}
} else {
try {
listener.onResponse(actionGet());
} catch (Throwable e) {
listener.onFailure(e);
}
} }
} }
} }

View File

@ -41,8 +41,6 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
@Override @Override
public final void messageReceived(final Request request, final TransportChannel channel) throws Exception { public final void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// no need to use threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<Response>() { execute(request, new ActionListener<Response>() {
@Override @Override
public void onResponse(Response response) { public void onResponse(Response response) {

View File

@ -26,8 +26,8 @@ import org.elasticsearch.threadpool.ThreadPool;
*/ */
public class PlainListenableActionFuture<T> extends AbstractListenableActionFuture<T, T> { public class PlainListenableActionFuture<T> extends AbstractListenableActionFuture<T, T> {
public PlainListenableActionFuture(boolean listenerThreaded, ThreadPool threadPool) { public PlainListenableActionFuture(ThreadPool threadPool) {
super(listenerThreaded, threadPool); super(threadPool);
} }
@Override @Override

View File

@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.Future;
/**
* An action listener that wraps another action listener and threading its execution.
*/
public final class ThreadedActionListener<Response> implements ActionListener<Response> {
/**
* Wrapper that can be used to automatically wrap a listener in a threaded listener if needed.
*/
public static class Wrapper {
private final ESLogger logger;
private final ThreadPool threadPool;
private final boolean threadedListener;
public Wrapper(ESLogger logger, Settings settings, ThreadPool threadPool) {
this.logger = logger;
this.threadPool = threadPool;
// Should the action listener be threaded or not by default. Action listeners are automatically threaded for client
// nodes and transport client in order to make sure client side code is not executed on IO threads.
this.threadedListener = DiscoveryNode.clientNode(settings) || TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING));
}
public <Response> ActionListener<Response> wrap(ActionListener<Response> listener) {
if (threadedListener == false) {
return listener;
}
// if its a future, the callback is very lightweight (flipping a bit) so no need to wrap it
if (listener instanceof Future) {
return listener;
}
return new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
}
}
private final ESLogger logger;
private final ThreadPool threadPool;
private final String executor;
private final ActionListener<Response> listener;
public ThreadedActionListener(ESLogger logger, ThreadPool threadPool, String executor, ActionListener<Response> listener) {
this.logger = logger;
this.threadPool = threadPool;
this.executor = executor;
this.listener = listener;
}
@Override
public void onResponse(final Response response) {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
listener.onResponse(response);
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
}
});
}
@Override
public void onFailure(final Throwable e) {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
listener.onFailure(e);
}
@Override
public void onFailure(Throwable t) {
logger.warn("failed to execute failure callback on [{}], failure [{}]", t, listener, e);
}
});
}
}

View File

@ -19,12 +19,10 @@
package org.elasticsearch.action.support; package org.elasticsearch.action.support;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*; import org.elasticsearch.action.*;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -49,21 +47,11 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
public final ActionFuture<Response> execute(Request request) { public final ActionFuture<Response> execute(Request request) {
PlainActionFuture<Response> future = newFuture(); PlainActionFuture<Response> future = newFuture();
// since we don't have a listener, and we release a possible lock with the future
// there is no need to execute it under a listener thread
request.listenerThreaded(false);
execute(request, future); execute(request, future);
return future; return future;
} }
public final void execute(Request request, ActionListener<Response> listener) { public final void execute(Request request, ActionListener<Response> listener) {
if (forceThreadedListener()) {
request.listenerThreaded(true);
}
if (request.listenerThreaded()) {
listener = new ThreadedActionListener<>(threadPool, listener, logger);
}
ActionRequestValidationException validationException = request.validate(); ActionRequestValidationException validationException = request.validate();
if (validationException != null) { if (validationException != null) {
listener.onFailure(validationException); listener.onFailure(validationException);
@ -83,69 +71,8 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
} }
} }
protected boolean forceThreadedListener() {
return false;
}
protected abstract void doExecute(Request request, ActionListener<Response> listener); protected abstract void doExecute(Request request, ActionListener<Response> listener);
static final class ThreadedActionListener<Response> implements ActionListener<Response> {
private final ThreadPool threadPool;
private final ActionListener<Response> listener;
private final ESLogger logger;
ThreadedActionListener(ThreadPool threadPool, ActionListener<Response> listener, ESLogger logger) {
this.threadPool = threadPool;
this.listener = listener;
this.logger = logger;
}
@Override
public void onResponse(final Response response) {
try {
threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() {
@Override
public void run() {
try {
listener.onResponse(response);
} catch (Throwable e) {
listener.onFailure(e);
}
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not run threaded action, execution rejected [{}] running on current thread", listener);
/* we don't care if that takes long since we are shutting down. But if we not respond somebody could wait
* for the response on the listener side which could be a remote machine so make sure we push it out there.*/
try {
listener.onResponse(response);
} catch (Throwable e) {
listener.onFailure(e);
}
}
}
@Override
public void onFailure(final Throwable e) {
try {
threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() {
@Override
public void run() {
listener.onFailure(e);
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Can not run threaded action, execution rejected for listener [{}] running on current thread", listener);
/* we don't care if that takes long since we are shutting down (or queue capacity). But if we not respond somebody could wait
* for the response on the listener side which could be a remote machine so make sure we push it out there.*/
listener.onFailure(e);
}
}
}
private static class RequestFilterChain<Request extends ActionRequest, Response extends ActionResponse> implements ActionFilterChain { private static class RequestFilterChain<Request extends ActionRequest, Response extends ActionResponse> implements ActionFilterChain {
private final TransportAction<Request, Response> action; private final TransportAction<Request, Response> action;

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -75,14 +76,11 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
} }
@Override @Override
protected boolean forceThreadedListener() { protected void doExecute(final Request request, ActionListener<Response> listener) {
// since the callback is async, we typically can get called from within an event in the cluster service // TODO do we really need to wrap it in a listener? the handlers should be cheap
// or something similar, so make sure we are threaded so we won't block it. if ((listener instanceof ThreadedActionListener) == false) {
return true; listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
} }
@Override
protected void doExecute(final Request request, final ActionListener<Response> listener) {
innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false); innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false);
} }

View File

@ -186,8 +186,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
class OperationTransportHandler implements TransportRequestHandler<Request> { class OperationTransportHandler implements TransportRequestHandler<Request> {
@Override @Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception { public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
// if we have a local operation, execute it on a thread since we don't spawn // if we have a local operation, execute it on a thread since we don't spawn
request.operationThreaded(true); request.operationThreaded(true);
execute(request, new ActionListener<Response>() { execute(request, new ActionListener<Response>() {

View File

@ -232,8 +232,6 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
@Override @Override
public void messageReceived(Request request, final TransportChannel channel) throws Exception { public void messageReceived(Request request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
// if we have a local operation, execute it on a thread since we don't spawn // if we have a local operation, execute it on a thread since we don't spawn
request.operationThreaded(true); request.operationThreaded(true);
execute(request, new ActionListener<Response>() { execute(request, new ActionListener<Response>() {

View File

@ -21,6 +21,8 @@ package org.elasticsearch.client.node;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.*; import org.elasticsearch.action.*;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -28,6 +30,8 @@ import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.support.Headers; import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -38,6 +42,7 @@ import java.util.Map;
*/ */
public class NodeClient extends AbstractClient { public class NodeClient extends AbstractClient {
private final ESLogger logger;
private final Settings settings; private final Settings settings;
private final ThreadPool threadPool; private final ThreadPool threadPool;
@ -46,9 +51,11 @@ public class NodeClient extends AbstractClient {
private final ImmutableMap<ClientAction, TransportAction> actions; private final ImmutableMap<ClientAction, TransportAction> actions;
private final Headers headers; private final Headers headers;
private final ThreadedActionListener.Wrapper threadedWrapper;
@Inject @Inject
public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin, Map<GenericAction, TransportAction> actions, Headers headers) { public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin, Map<GenericAction, TransportAction> actions, Headers headers) {
this.logger = Loggers.getLogger(getClass(), settings);
this.settings = settings; this.settings = settings;
this.threadPool = threadPool; this.threadPool = threadPool;
this.admin = admin; this.admin = admin;
@ -60,6 +67,7 @@ public class NodeClient extends AbstractClient {
} }
} }
this.actions = actionsBuilder.immutableMap(); this.actions = actionsBuilder.immutableMap();
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
} }
@Override @Override
@ -84,16 +92,17 @@ public class NodeClient extends AbstractClient {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) { public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, Client> action, final Request request) {
headers.applyTo(request); PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
TransportAction<Request, Response> transportAction = actions.get((ClientAction)action); execute(action, request, actionFuture);
return transportAction.execute(request); return actionFuture;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) { public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
headers.applyTo(request); headers.applyTo(request);
listener = threadedWrapper.wrap(listener);
TransportAction<Request, Response> transportAction = actions.get((ClientAction)action); TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);
transportAction.execute(request, listener); transportAction.execute(request, listener);
} }

View File

@ -22,12 +22,17 @@ package org.elasticsearch.client.node;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.*; import org.elasticsearch.action.*;
import org.elasticsearch.action.admin.cluster.ClusterAction; import org.elasticsearch.action.admin.cluster.ClusterAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.support.AbstractClusterAdminClient; import org.elasticsearch.client.support.AbstractClusterAdminClient;
import org.elasticsearch.client.support.Headers; import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map; import java.util.Map;
@ -37,14 +42,15 @@ import java.util.Map;
*/ */
public class NodeClusterAdminClient extends AbstractClusterAdminClient implements ClusterAdminClient { public class NodeClusterAdminClient extends AbstractClusterAdminClient implements ClusterAdminClient {
private final ESLogger logger;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ImmutableMap<ClusterAction, TransportAction> actions; private final ImmutableMap<ClusterAction, TransportAction> actions;
private final Headers headers; private final Headers headers;
private final ThreadedActionListener.Wrapper threadedWrapper;
@Inject @Inject
public NodeClusterAdminClient(ThreadPool threadPool, Map<GenericAction, TransportAction> actions, Headers headers) { public NodeClusterAdminClient(Settings settings, ThreadPool threadPool, Map<GenericAction, TransportAction> actions, Headers headers) {
this.logger = Loggers.getLogger(getClass(), settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.headers = headers; this.headers = headers;
MapBuilder<ClusterAction, TransportAction> actionsBuilder = new MapBuilder<>(); MapBuilder<ClusterAction, TransportAction> actionsBuilder = new MapBuilder<>();
@ -54,6 +60,7 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement
} }
} }
this.actions = actionsBuilder.immutableMap(); this.actions = actionsBuilder.immutableMap();
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
} }
@Override @Override
@ -63,16 +70,17 @@ public class NodeClusterAdminClient extends AbstractClusterAdminClient implement
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request) { public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, ClusterAdminClient> action, final Request request) {
headers.applyTo(request); PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
TransportAction<Request, Response> transportAction = actions.get((ClusterAction)action); execute(action, request, actionFuture);
return transportAction.execute(request); return actionFuture;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request, ActionListener<Response> listener) { public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(Action<Request, Response, RequestBuilder, ClusterAdminClient> action, Request request, ActionListener<Response> listener) {
headers.applyTo(request); headers.applyTo(request);
listener = threadedWrapper.wrap(listener);
TransportAction<Request, Response> transportAction = actions.get((ClusterAction)action); TransportAction<Request, Response> transportAction = actions.get((ClusterAction)action);
transportAction.execute(request, listener); transportAction.execute(request, listener);
} }

View File

@ -22,12 +22,17 @@ package org.elasticsearch.client.node;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.*; import org.elasticsearch.action.*;
import org.elasticsearch.action.admin.indices.IndicesAction; import org.elasticsearch.action.admin.indices.IndicesAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.support.AbstractIndicesAdminClient; import org.elasticsearch.client.support.AbstractIndicesAdminClient;
import org.elasticsearch.client.support.Headers; import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map; import java.util.Map;
@ -37,14 +42,15 @@ import java.util.Map;
*/ */
public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implements IndicesAdminClient { public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implements IndicesAdminClient {
private final ESLogger logger;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ImmutableMap<IndicesAction, TransportAction> actions; private final ImmutableMap<IndicesAction, TransportAction> actions;
private final Headers headers; private final Headers headers;
private final ThreadedActionListener.Wrapper threadedWrapper;
@Inject @Inject
public NodeIndicesAdminClient(ThreadPool threadPool, Map<GenericAction, TransportAction> actions, Headers headers) { public NodeIndicesAdminClient(Settings settings, ThreadPool threadPool, Map<GenericAction, TransportAction> actions, Headers headers) {
this.logger = Loggers.getLogger(getClass(), settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.headers = headers; this.headers = headers;
MapBuilder<IndicesAction, TransportAction> actionsBuilder = new MapBuilder<>(); MapBuilder<IndicesAction, TransportAction> actionsBuilder = new MapBuilder<>();
@ -54,6 +60,7 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
} }
} }
this.actions = actionsBuilder.immutableMap(); this.actions = actionsBuilder.immutableMap();
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
} }
@Override @Override
@ -63,16 +70,17 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request) { public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, IndicesAdminClient> action, final Request request) {
headers.applyTo(request); PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
TransportAction<Request, Response> transportAction = actions.get((IndicesAction)action); execute(action, request, actionFuture);
return transportAction.execute(request); return actionFuture;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request, ActionListener<Response> listener) { public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(Action<Request, Response, RequestBuilder, IndicesAdminClient> action, Request request, ActionListener<Response> listener) {
headers.applyTo(request); headers.applyTo(request);
listener = threadedWrapper.wrap(listener);
TransportAction<Request, Response> transportAction = actions.get((IndicesAction)action); TransportAction<Request, Response> transportAction = actions.get((IndicesAction)action);
transportAction.execute(request, listener); transportAction.execute(request, listener);
} }

View File

@ -93,7 +93,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
*/ */
public class TransportClient extends AbstractClient { public class TransportClient extends AbstractClient {
private static final String CLIENT_TYPE = "transport"; public static final String CLIENT_TYPE = "transport";
final Injector injector; final Injector injector;

View File

@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -38,11 +37,9 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -199,7 +196,7 @@ public class TransportClientNodesService extends AbstractComponent {
ImmutableList<DiscoveryNode> nodes = this.nodes; ImmutableList<DiscoveryNode> nodes = this.nodes;
ensureNodesAreAvailable(nodes); ensureNodesAreAvailable(nodes);
int index = getNodeNumber(); int index = getNodeNumber();
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, threadPool, logger); RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index);
DiscoveryNode node = nodes.get((index) % nodes.size()); DiscoveryNode node = nodes.get((index) % nodes.size());
try { try {
callback.doWithNode(node, retryListener); callback.doWithNode(node, retryListener);
@ -213,20 +210,15 @@ public class TransportClientNodesService extends AbstractComponent {
private final NodeListenerCallback<Response> callback; private final NodeListenerCallback<Response> callback;
private final ActionListener<Response> listener; private final ActionListener<Response> listener;
private final ImmutableList<DiscoveryNode> nodes; private final ImmutableList<DiscoveryNode> nodes;
private final ESLogger logger;
private final int index; private final int index;
private ThreadPool threadPool;
private volatile int i; private volatile int i;
public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener, ImmutableList<DiscoveryNode> nodes, public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener, ImmutableList<DiscoveryNode> nodes, int index) {
int index, ThreadPool threadPool, ESLogger logger) {
this.callback = callback; this.callback = callback;
this.listener = listener; this.listener = listener;
this.nodes = nodes; this.nodes = nodes;
this.index = index; this.index = index;
this.threadPool = threadPool;
this.logger = logger;
} }
@Override @Override
@ -239,38 +231,21 @@ public class TransportClientNodesService extends AbstractComponent {
if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) { if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
int i = ++this.i; int i = ++this.i;
if (i >= nodes.size()) { if (i >= nodes.size()) {
runFailureInListenerThreadPool(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e)); listener.onFailure(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e));
} else { } else {
try { try {
callback.doWithNode(nodes.get((index + i) % nodes.size()), this); callback.doWithNode(nodes.get((index + i) % nodes.size()), this);
} catch(final Throwable t) { } catch(final Throwable t) {
// this exception can't come from the TransportService as it doesn't throw exceptions at all // this exception can't come from the TransportService as it doesn't throw exceptions at all
runFailureInListenerThreadPool(t); listener.onFailure(t);
} }
} }
} else { } else {
runFailureInListenerThreadPool(e); listener.onFailure(e);
} }
} }
// need to ensure to not block the netty I/O thread, in case of retry due to the node sampling
private void runFailureInListenerThreadPool(final Throwable t) {
threadPool.executor(ThreadPool.Names.LISTENER).execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
listener.onFailure(t);
}
@Override
public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("Could not execute failure listener: [{}]", t, t.getMessage());
} else {
logger.error("Could not execute failure listener: [{}]", t.getMessage());
}
}
});
}
} }
public void close() { public void close() {
@ -505,7 +480,7 @@ public class TransportClientNodesService extends AbstractComponent {
} }
} }
public static interface NodeListenerCallback<Response> { public interface NodeListenerCallback<Response> {
void doWithNode(DiscoveryNode node, ActionListener<Response> listener); void doWithNode(DiscoveryNode node, ActionListener<Response> listener);
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.client.transport.support;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.*; import org.elasticsearch.action.*;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.client.support.AbstractClient;
@ -30,6 +31,8 @@ import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -41,21 +44,20 @@ import java.util.Map;
*/ */
public class InternalTransportClient extends AbstractClient { public class InternalTransportClient extends AbstractClient {
private final ESLogger logger;
private final Settings settings; private final Settings settings;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final TransportClientNodesService nodesService; private final TransportClientNodesService nodesService;
private final InternalTransportAdminClient adminClient; private final InternalTransportAdminClient adminClient;
private final ImmutableMap<Action, TransportActionNodeProxy> actions; private final ImmutableMap<Action, TransportActionNodeProxy> actions;
private final Headers headers; private final Headers headers;
private final ThreadedActionListener.Wrapper threadedWrapper;
@Inject @Inject
public InternalTransportClient(Settings settings, ThreadPool threadPool, TransportService transportService, public InternalTransportClient(Settings settings, ThreadPool threadPool, TransportService transportService,
TransportClientNodesService nodesService, InternalTransportAdminClient adminClient, TransportClientNodesService nodesService, InternalTransportAdminClient adminClient,
Map<String, GenericAction> actions, Headers headers) { Map<String, GenericAction> actions, Headers headers) {
this.logger = Loggers.getLogger(getClass(), settings);
this.settings = settings; this.settings = settings;
this.threadPool = threadPool; this.threadPool = threadPool;
this.nodesService = nodesService; this.nodesService = nodesService;
@ -68,6 +70,7 @@ public class InternalTransportClient extends AbstractClient {
} }
} }
this.actions = actionsBuilder.immutableMap(); this.actions = actionsBuilder.immutableMap();
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
} }
@Override @Override
@ -102,6 +105,7 @@ public class InternalTransportClient extends AbstractClient {
@Override @Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(final Action<Request, Response, RequestBuilder, Client> action, final Request request, ActionListener<Response> listener) { public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(final Action<Request, Response, RequestBuilder, Client> action, final Request request, ActionListener<Response> listener) {
headers.applyTo(request); headers.applyTo(request);
listener = threadedWrapper.wrap(listener);
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action); final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() { nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override @Override

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.*; import org.elasticsearch.action.*;
import org.elasticsearch.action.admin.cluster.ClusterAction; import org.elasticsearch.action.admin.cluster.ClusterAction;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.support.AbstractClusterAdminClient; import org.elasticsearch.client.support.AbstractClusterAdminClient;
import org.elasticsearch.client.support.Headers; import org.elasticsearch.client.support.Headers;
@ -30,6 +31,8 @@ import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -42,17 +45,17 @@ import java.util.Map;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class InternalTransportClusterAdminClient extends AbstractClusterAdminClient implements ClusterAdminClient { public class InternalTransportClusterAdminClient extends AbstractClusterAdminClient implements ClusterAdminClient {
private final ESLogger logger;
private final TransportClientNodesService nodesService; private final TransportClientNodesService nodesService;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ImmutableMap<ClusterAction, TransportActionNodeProxy> actions; private final ImmutableMap<ClusterAction, TransportActionNodeProxy> actions;
private final Headers headers; private final Headers headers;
private final ThreadedActionListener.Wrapper threadedWrapper;
@Inject @Inject
public InternalTransportClusterAdminClient(Settings settings, TransportClientNodesService nodesService, ThreadPool threadPool, TransportService transportService, public InternalTransportClusterAdminClient(Settings settings, TransportClientNodesService nodesService, ThreadPool threadPool, TransportService transportService,
Map<String, GenericAction> actions, Headers headers) { Map<String, GenericAction> actions, Headers headers) {
this.logger = Loggers.getLogger(getClass(), settings);
this.nodesService = nodesService; this.nodesService = nodesService;
this.threadPool = threadPool; this.threadPool = threadPool;
this.headers = headers; this.headers = headers;
@ -63,6 +66,7 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
} }
} }
this.actions = actionsBuilder.immutableMap(); this.actions = actionsBuilder.immutableMap();
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
} }
@Override @Override
@ -80,8 +84,9 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(final Action<Request, Response, RequestBuilder, ClusterAdminClient> action, final Request request, final ActionListener<Response> listener) { public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> void execute(final Action<Request, Response, RequestBuilder, ClusterAdminClient> action, final Request request, ActionListener<Response> listener) {
headers.applyTo(request); headers.applyTo(request);
listener = threadedWrapper.wrap(listener);
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action); final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() { nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override @Override

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.*; import org.elasticsearch.action.*;
import org.elasticsearch.action.admin.indices.IndicesAction; import org.elasticsearch.action.admin.indices.IndicesAction;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.support.AbstractIndicesAdminClient; import org.elasticsearch.client.support.AbstractIndicesAdminClient;
import org.elasticsearch.client.support.Headers; import org.elasticsearch.client.support.Headers;
@ -30,6 +31,8 @@ import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -42,17 +45,17 @@ import java.util.Map;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminClient implements IndicesAdminClient { public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminClient implements IndicesAdminClient {
private final ESLogger logger;
private final TransportClientNodesService nodesService; private final TransportClientNodesService nodesService;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ImmutableMap<Action, TransportActionNodeProxy> actions; private final ImmutableMap<Action, TransportActionNodeProxy> actions;
private final Headers headers; private final Headers headers;
private final ThreadedActionListener.Wrapper threadedWrapper;
@Inject @Inject
public InternalTransportIndicesAdminClient(Settings settings, TransportClientNodesService nodesService, TransportService transportService, ThreadPool threadPool, public InternalTransportIndicesAdminClient(Settings settings, TransportClientNodesService nodesService, TransportService transportService, ThreadPool threadPool,
Map<String, GenericAction> actions, Headers headers) { Map<String, GenericAction> actions, Headers headers) {
this.logger = Loggers.getLogger(getClass(), settings);
this.nodesService = nodesService; this.nodesService = nodesService;
this.threadPool = threadPool; this.threadPool = threadPool;
this.headers = headers; this.headers = headers;
@ -63,6 +66,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
} }
} }
this.actions = actionsBuilder.immutableMap(); this.actions = actionsBuilder.immutableMap();
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
} }
@Override @Override
@ -82,6 +86,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
@Override @Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(final Action<Request, Response, RequestBuilder, IndicesAdminClient> action, final Request request, ActionListener<Response> listener) { public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> void execute(final Action<Request, Response, RequestBuilder, IndicesAdminClient> action, final Request request, ActionListener<Response> listener) {
headers.applyTo(request); headers.applyTo(request);
listener = threadedWrapper.wrap(listener);
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action); final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() { nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override @Override

View File

@ -50,7 +50,6 @@ public class RestClusterHealthAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index"))); ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index")));
clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local())); clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local()));
clusterHealthRequest.listenerThreaded(false);
clusterHealthRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterHealthRequest.masterNodeTimeout())); clusterHealthRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterHealthRequest.masterNodeTimeout()));
clusterHealthRequest.timeout(request.paramAsTime("timeout", clusterHealthRequest.timeout())); clusterHealthRequest.timeout(request.paramAsTime("timeout", clusterHealthRequest.timeout()));
String waitForStatus = request.param("wait_for_status"); String waitForStatus = request.param("wait_for_status");

View File

@ -81,8 +81,6 @@ public class RestNodesInfoAction extends BaseRestHandler {
} }
final NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(nodeIds); final NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(nodeIds);
nodesInfoRequest.listenerThreaded(false);
// shortcut, dont do checks if only all is specified // shortcut, dont do checks if only all is specified
if (metrics.size() == 1 && metrics.contains("_all")) { if (metrics.size() == 1 && metrics.contains("_all")) {
nodesInfoRequest.all(); nodesInfoRequest.all();

View File

@ -60,7 +60,6 @@ public class RestNodesStatsAction extends BaseRestHandler {
Set<String> metrics = Strings.splitStringByCommaToSet(request.param("metric", "_all")); Set<String> metrics = Strings.splitStringByCommaToSet(request.param("metric", "_all"));
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodesIds); NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodesIds);
nodesStatsRequest.listenerThreaded(false);
if (metrics.size() == 1 && metrics.contains("_all")) { if (metrics.size() == 1 && metrics.contains("_all")) {
nodesStatsRequest.all(); nodesStatsRequest.all();

View File

@ -45,7 +45,6 @@ public class RestDeleteRepositoryAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
DeleteRepositoryRequest deleteRepositoryRequest = deleteRepositoryRequest(request.param("repository")); DeleteRepositoryRequest deleteRepositoryRequest = deleteRepositoryRequest(request.param("repository"));
deleteRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteRepositoryRequest.masterNodeTimeout())); deleteRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteRepositoryRequest.masterNodeTimeout()));
deleteRepositoryRequest.listenerThreaded(false);
deleteRepositoryRequest.timeout(request.paramAsTime("timeout", deleteRepositoryRequest.timeout())); deleteRepositoryRequest.timeout(request.paramAsTime("timeout", deleteRepositoryRequest.timeout()));
deleteRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteRepositoryRequest.masterNodeTimeout())); deleteRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteRepositoryRequest.masterNodeTimeout()));
client.admin().cluster().deleteRepository(deleteRepositoryRequest, new AcknowledgedRestListener<DeleteRepositoryResponse>(channel)); client.admin().cluster().deleteRepository(deleteRepositoryRequest, new AcknowledgedRestListener<DeleteRepositoryResponse>(channel));

View File

@ -47,7 +47,6 @@ public class RestPutRepositoryAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
PutRepositoryRequest putRepositoryRequest = putRepositoryRequest(request.param("repository")); PutRepositoryRequest putRepositoryRequest = putRepositoryRequest(request.param("repository"));
putRepositoryRequest.listenerThreaded(false);
putRepositoryRequest.source(request.content().toUtf8()); putRepositoryRequest.source(request.content().toUtf8());
putRepositoryRequest.verify(request.paramAsBoolean("verify", true)); putRepositoryRequest.verify(request.paramAsBoolean("verify", true));
putRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putRepositoryRequest.masterNodeTimeout())); putRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putRepositoryRequest.masterNodeTimeout()));

View File

@ -50,7 +50,6 @@ public class RestVerifyRepositoryAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
VerifyRepositoryRequest verifyRepositoryRequest = verifyRepositoryRequest(request.param("repository")); VerifyRepositoryRequest verifyRepositoryRequest = verifyRepositoryRequest(request.param("repository"));
verifyRepositoryRequest.listenerThreaded(false);
verifyRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", verifyRepositoryRequest.masterNodeTimeout())); verifyRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", verifyRepositoryRequest.masterNodeTimeout()));
verifyRepositoryRequest.timeout(request.paramAsTime("timeout", verifyRepositoryRequest.timeout())); verifyRepositoryRequest.timeout(request.paramAsTime("timeout", verifyRepositoryRequest.timeout()));
client.admin().cluster().verifyRepository(verifyRepositoryRequest, new RestToXContentListener<VerifyRepositoryResponse>(channel)); client.admin().cluster().verifyRepository(verifyRepositoryRequest, new RestToXContentListener<VerifyRepositoryResponse>(channel));

View File

@ -54,7 +54,6 @@ public class RestClusterRerouteAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
final ClusterRerouteRequest clusterRerouteRequest = Requests.clusterRerouteRequest(); final ClusterRerouteRequest clusterRerouteRequest = Requests.clusterRerouteRequest();
clusterRerouteRequest.listenerThreaded(false);
clusterRerouteRequest.dryRun(request.paramAsBoolean("dry_run", clusterRerouteRequest.dryRun())); clusterRerouteRequest.dryRun(request.paramAsBoolean("dry_run", clusterRerouteRequest.dryRun()));
clusterRerouteRequest.explain(request.paramAsBoolean("explain", clusterRerouteRequest.explain())); clusterRerouteRequest.explain(request.paramAsBoolean("explain", clusterRerouteRequest.explain()));
clusterRerouteRequest.timeout(request.paramAsTime("timeout", clusterRerouteRequest.timeout())); clusterRerouteRequest.timeout(request.paramAsTime("timeout", clusterRerouteRequest.timeout()));

View File

@ -42,7 +42,6 @@ public class RestClusterGetSettingsAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest() ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest()
.listenerThreaded(false)
.routingTable(false) .routingTable(false)
.nodes(false); .nodes(false);
clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local())); clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));

View File

@ -46,7 +46,6 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest(); final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest();
clusterUpdateSettingsRequest.listenerThreaded(false);
clusterUpdateSettingsRequest.timeout(request.paramAsTime("timeout", clusterUpdateSettingsRequest.timeout())); clusterUpdateSettingsRequest.timeout(request.paramAsTime("timeout", clusterUpdateSettingsRequest.timeout()));
clusterUpdateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterUpdateSettingsRequest.masterNodeTimeout())); clusterUpdateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterUpdateSettingsRequest.masterNodeTimeout()));
Map<String, Object> source = XContentFactory.xContent(request.content()).createParser(request.content()).mapAndClose(); Map<String, Object> source = XContentFactory.xContent(request.content()).createParser(request.content()).mapAndClose();

View File

@ -53,7 +53,6 @@ public class RestClusterSearchShardsAction extends BaseRestHandler {
String[] indices = Strings.splitStringByCommaToArray(request.param("index")); String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final ClusterSearchShardsRequest clusterSearchShardsRequest = Requests.clusterSearchShardsRequest(indices); final ClusterSearchShardsRequest clusterSearchShardsRequest = Requests.clusterSearchShardsRequest(indices);
clusterSearchShardsRequest.local(request.paramAsBoolean("local", clusterSearchShardsRequest.local())); clusterSearchShardsRequest.local(request.paramAsBoolean("local", clusterSearchShardsRequest.local()));
clusterSearchShardsRequest.listenerThreaded(false);
clusterSearchShardsRequest.types(Strings.splitStringByCommaToArray(request.param("type"))); clusterSearchShardsRequest.types(Strings.splitStringByCommaToArray(request.param("type")));
clusterSearchShardsRequest.routing(request.param("routing")); clusterSearchShardsRequest.routing(request.param("routing"));

View File

@ -46,7 +46,6 @@ public class RestCreateSnapshotAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
CreateSnapshotRequest createSnapshotRequest = createSnapshotRequest(request.param("repository"), request.param("snapshot")); CreateSnapshotRequest createSnapshotRequest = createSnapshotRequest(request.param("repository"), request.param("snapshot"));
createSnapshotRequest.listenerThreaded(false);
createSnapshotRequest.source(request.content().toUtf8()); createSnapshotRequest.source(request.content().toUtf8());
createSnapshotRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createSnapshotRequest.masterNodeTimeout())); createSnapshotRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createSnapshotRequest.masterNodeTimeout()));
createSnapshotRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false)); createSnapshotRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false));

View File

@ -57,7 +57,6 @@ public class RestClusterStateAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
final ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest(); final ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest();
clusterStateRequest.listenerThreaded(false);
clusterStateRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterStateRequest.indicesOptions())); clusterStateRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterStateRequest.indicesOptions()));
clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local())); clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout())); clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout()));

View File

@ -43,7 +43,6 @@ public class RestClusterStatsAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null)); ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.listenerThreaded(false);
client.admin().cluster().clusterStats(clusterStatsRequest, new RestToXContentListener<ClusterStatsResponse>(channel)); client.admin().cluster().clusterStats(clusterStatsRequest, new RestToXContentListener<ClusterStatsResponse>(channel));
} }
} }

View File

@ -49,7 +49,6 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
indicesAliasesRequest.listenerThreaded(false);
indicesAliasesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indicesAliasesRequest.masterNodeTimeout())); indicesAliasesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indicesAliasesRequest.masterNodeTimeout()));
try (XContentParser parser = XContentFactory.xContent(request.content()).createParser(request.content())) { try (XContentParser parser = XContentFactory.xContent(request.content()).createParser(request.content())) {
// { // {

View File

@ -61,7 +61,6 @@ public class RestGetIndicesAliasesAction extends BaseRestHandler {
.nodes(false) .nodes(false)
.indices(indices); .indices(indices);
clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local())); clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
clusterStateRequest.listenerThreaded(false);
client.admin().cluster().state(clusterStateRequest, new RestBuilderListener<ClusterStateResponse>(channel) { client.admin().cluster().state(clusterStateRequest, new RestBuilderListener<ClusterStateResponse>(channel) {
@Override @Override

View File

@ -59,7 +59,6 @@ public class RestAnalyzeAction extends BaseRestHandler {
AnalyzeRequest analyzeRequest = new AnalyzeRequest(request.param("index")); AnalyzeRequest analyzeRequest = new AnalyzeRequest(request.param("index"));
analyzeRequest.text(text); analyzeRequest.text(text);
analyzeRequest.listenerThreaded(false);
analyzeRequest.preferLocal(request.paramAsBoolean("prefer_local", analyzeRequest.preferLocalShard())); analyzeRequest.preferLocal(request.paramAsBoolean("prefer_local", analyzeRequest.preferLocalShard()));
analyzeRequest.analyzer(request.param("analyzer")); analyzeRequest.analyzer(request.param("analyzer"));
analyzeRequest.field(request.param("field")); analyzeRequest.field(request.param("field"));

View File

@ -56,7 +56,6 @@ public class RestClearIndicesCacheAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(Strings.splitStringByCommaToArray(request.param("index"))); ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(Strings.splitStringByCommaToArray(request.param("index")));
clearIndicesCacheRequest.listenerThreaded(false);
clearIndicesCacheRequest.indicesOptions(IndicesOptions.fromRequest(request, clearIndicesCacheRequest.indicesOptions())); clearIndicesCacheRequest.indicesOptions(IndicesOptions.fromRequest(request, clearIndicesCacheRequest.indicesOptions()));
fromRequest(request, clearIndicesCacheRequest); fromRequest(request, clearIndicesCacheRequest);
client.admin().indices().clearCache(clearIndicesCacheRequest, new RestBuilderListener<ClearIndicesCacheResponse>(channel) { client.admin().indices().clearCache(clearIndicesCacheRequest, new RestBuilderListener<ClearIndicesCacheResponse>(channel) {

View File

@ -44,7 +44,6 @@ public class RestCloseIndexAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(Strings.splitStringByCommaToArray(request.param("index"))); CloseIndexRequest closeIndexRequest = new CloseIndexRequest(Strings.splitStringByCommaToArray(request.param("index")));
closeIndexRequest.listenerThreaded(false);
closeIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", closeIndexRequest.masterNodeTimeout())); closeIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", closeIndexRequest.masterNodeTimeout()));
closeIndexRequest.timeout(request.paramAsTime("timeout", closeIndexRequest.timeout())); closeIndexRequest.timeout(request.paramAsTime("timeout", closeIndexRequest.timeout()));
closeIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, closeIndexRequest.indicesOptions())); closeIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, closeIndexRequest.indicesOptions()));

View File

@ -43,7 +43,6 @@ public class RestCreateIndexAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(request.param("index")); CreateIndexRequest createIndexRequest = new CreateIndexRequest(request.param("index"));
createIndexRequest.listenerThreaded(false);
if (request.hasContent()) { if (request.hasContent()) {
createIndexRequest.source(request.content()); createIndexRequest.source(request.content());
} }

View File

@ -44,7 +44,6 @@ public class RestDeleteIndexAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(Strings.splitStringByCommaToArray(request.param("index"))); DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(Strings.splitStringByCommaToArray(request.param("index")));
deleteIndexRequest.listenerThreaded(false);
deleteIndexRequest.timeout(request.paramAsTime("timeout", deleteIndexRequest.timeout())); deleteIndexRequest.timeout(request.paramAsTime("timeout", deleteIndexRequest.timeout()));
deleteIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexRequest.masterNodeTimeout())); deleteIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexRequest.masterNodeTimeout()));
deleteIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteIndexRequest.indicesOptions())); deleteIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteIndexRequest.indicesOptions()));

View File

@ -49,7 +49,6 @@ public class RestIndicesExistsAction extends BaseRestHandler {
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(Strings.splitStringByCommaToArray(request.param("index"))); IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(Strings.splitStringByCommaToArray(request.param("index")));
indicesExistsRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesExistsRequest.indicesOptions())); indicesExistsRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesExistsRequest.indicesOptions()));
indicesExistsRequest.local(request.paramAsBoolean("local", indicesExistsRequest.local())); indicesExistsRequest.local(request.paramAsBoolean("local", indicesExistsRequest.local()));
indicesExistsRequest.listenerThreaded(false);
client.admin().indices().exists(indicesExistsRequest, new RestResponseListener<IndicesExistsResponse>(channel) { client.admin().indices().exists(indicesExistsRequest, new RestResponseListener<IndicesExistsResponse>(channel) {
@Override @Override
public RestResponse buildResponse(IndicesExistsResponse response) { public RestResponse buildResponse(IndicesExistsResponse response) {

View File

@ -48,7 +48,6 @@ public class RestTypesExistsAction extends BaseRestHandler {
TypesExistsRequest typesExistsRequest = new TypesExistsRequest( TypesExistsRequest typesExistsRequest = new TypesExistsRequest(
Strings.splitStringByCommaToArray(request.param("index")), Strings.splitStringByCommaToArray(request.param("type")) Strings.splitStringByCommaToArray(request.param("index")), Strings.splitStringByCommaToArray(request.param("type"))
); );
typesExistsRequest.listenerThreaded(false);
typesExistsRequest.local(request.paramAsBoolean("local", typesExistsRequest.local())); typesExistsRequest.local(request.paramAsBoolean("local", typesExistsRequest.local()));
typesExistsRequest.indicesOptions(IndicesOptions.fromRequest(request, typesExistsRequest.indicesOptions())); typesExistsRequest.indicesOptions(IndicesOptions.fromRequest(request, typesExistsRequest.indicesOptions()));
client.admin().indices().typesExists(typesExistsRequest, new RestResponseListener<TypesExistsResponse>(channel) { client.admin().indices().typesExists(typesExistsRequest, new RestResponseListener<TypesExistsResponse>(channel) {

View File

@ -53,7 +53,6 @@ public class RestFlushAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index"))); FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index")));
flushRequest.listenerThreaded(false);
flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions())); flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions()));
flushRequest.force(request.paramAsBoolean("force", flushRequest.force())); flushRequest.force(request.paramAsBoolean("force", flushRequest.force()));
flushRequest.waitIfOngoing(request.paramAsBoolean("wait_if_ongoing", flushRequest.waitIfOngoing())); flushRequest.waitIfOngoing(request.paramAsBoolean("wait_if_ongoing", flushRequest.waitIfOngoing()));

View File

@ -67,7 +67,6 @@ public class RestPutMappingAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
PutMappingRequest putMappingRequest = putMappingRequest(Strings.splitStringByCommaToArray(request.param("index"))); PutMappingRequest putMappingRequest = putMappingRequest(Strings.splitStringByCommaToArray(request.param("index")));
putMappingRequest.listenerThreaded(false);
putMappingRequest.type(request.param("type")); putMappingRequest.type(request.param("type"));
putMappingRequest.source(request.content().toUtf8()); putMappingRequest.source(request.content().toUtf8());
putMappingRequest.timeout(request.paramAsTime("timeout", putMappingRequest.timeout())); putMappingRequest.timeout(request.paramAsTime("timeout", putMappingRequest.timeout()));

View File

@ -44,7 +44,6 @@ public class RestOpenIndexAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
OpenIndexRequest openIndexRequest = new OpenIndexRequest(Strings.splitStringByCommaToArray(request.param("index"))); OpenIndexRequest openIndexRequest = new OpenIndexRequest(Strings.splitStringByCommaToArray(request.param("index")));
openIndexRequest.listenerThreaded(false);
openIndexRequest.timeout(request.paramAsTime("timeout", openIndexRequest.timeout())); openIndexRequest.timeout(request.paramAsTime("timeout", openIndexRequest.timeout()));
openIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", openIndexRequest.masterNodeTimeout())); openIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", openIndexRequest.masterNodeTimeout()));
openIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, openIndexRequest.indicesOptions())); openIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, openIndexRequest.indicesOptions()));

View File

@ -53,7 +53,6 @@ public class RestOptimizeAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
OptimizeRequest optimizeRequest = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index"))); OptimizeRequest optimizeRequest = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
optimizeRequest.listenerThreaded(false);
optimizeRequest.indicesOptions(IndicesOptions.fromRequest(request, optimizeRequest.indicesOptions())); optimizeRequest.indicesOptions(IndicesOptions.fromRequest(request, optimizeRequest.indicesOptions()));
optimizeRequest.maxNumSegments(request.paramAsInt("max_num_segments", optimizeRequest.maxNumSegments())); optimizeRequest.maxNumSegments(request.paramAsInt("max_num_segments", optimizeRequest.maxNumSegments()));
optimizeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", optimizeRequest.onlyExpungeDeletes())); optimizeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", optimizeRequest.onlyExpungeDeletes()));

View File

@ -51,7 +51,6 @@ public class RestRecoveryAction extends BaseRestHandler {
final RecoveryRequest recoveryRequest = new RecoveryRequest(Strings.splitStringByCommaToArray(request.param("index"))); final RecoveryRequest recoveryRequest = new RecoveryRequest(Strings.splitStringByCommaToArray(request.param("index")));
recoveryRequest.detailed(request.paramAsBoolean("detailed", false)); recoveryRequest.detailed(request.paramAsBoolean("detailed", false));
recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false)); recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false));
recoveryRequest.listenerThreaded(false);
recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions())); recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions()));
client.admin().indices().recoveries(recoveryRequest, new RestBuilderListener<RecoveryResponse>(channel) { client.admin().indices().recoveries(recoveryRequest, new RestBuilderListener<RecoveryResponse>(channel) {

View File

@ -53,7 +53,6 @@ public class RestRefreshAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
RefreshRequest refreshRequest = new RefreshRequest(Strings.splitStringByCommaToArray(request.param("index"))); RefreshRequest refreshRequest = new RefreshRequest(Strings.splitStringByCommaToArray(request.param("index")));
refreshRequest.listenerThreaded(false);
refreshRequest.indicesOptions(IndicesOptions.fromRequest(request, refreshRequest.indicesOptions())); refreshRequest.indicesOptions(IndicesOptions.fromRequest(request, refreshRequest.indicesOptions()));
client.admin().indices().refresh(refreshRequest, new RestBuilderListener<RefreshResponse>(channel) { client.admin().indices().refresh(refreshRequest, new RestBuilderListener<RefreshResponse>(channel) {
@Override @Override

View File

@ -49,7 +49,6 @@ public class RestIndicesSegmentsAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest(Strings.splitStringByCommaToArray(request.param("index"))); IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest(Strings.splitStringByCommaToArray(request.param("index")));
indicesSegmentsRequest.verbose(request.paramAsBoolean("verbose", false)); indicesSegmentsRequest.verbose(request.paramAsBoolean("verbose", false));
indicesSegmentsRequest.listenerThreaded(false);
indicesSegmentsRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesSegmentsRequest.indicesOptions())); indicesSegmentsRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesSegmentsRequest.indicesOptions()));
client.admin().indices().segments(indicesSegmentsRequest, new RestBuilderListener<IndicesSegmentResponse>(channel) { client.admin().indices().segments(indicesSegmentsRequest, new RestBuilderListener<IndicesSegmentResponse>(channel) {
@Override @Override

View File

@ -55,7 +55,6 @@ public class RestUpdateSettingsAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
UpdateSettingsRequest updateSettingsRequest = updateSettingsRequest(Strings.splitStringByCommaToArray(request.param("index"))); UpdateSettingsRequest updateSettingsRequest = updateSettingsRequest(Strings.splitStringByCommaToArray(request.param("index")));
updateSettingsRequest.listenerThreaded(false);
updateSettingsRequest.timeout(request.paramAsTime("timeout", updateSettingsRequest.timeout())); updateSettingsRequest.timeout(request.paramAsTime("timeout", updateSettingsRequest.timeout()));
updateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", updateSettingsRequest.masterNodeTimeout())); updateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", updateSettingsRequest.masterNodeTimeout()));
updateSettingsRequest.indicesOptions(IndicesOptions.fromRequest(request, updateSettingsRequest.indicesOptions())); updateSettingsRequest.indicesOptions(IndicesOptions.fromRequest(request, updateSettingsRequest.indicesOptions()));

View File

@ -53,7 +53,6 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesStatsRequest.indicesOptions())); indicesStatsRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesStatsRequest.indicesOptions()));
indicesStatsRequest.indices(Strings.splitStringByCommaToArray(request.param("index"))); indicesStatsRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));
indicesStatsRequest.types(Strings.splitStringByCommaToArray(request.param("types"))); indicesStatsRequest.types(Strings.splitStringByCommaToArray(request.param("types")));

View File

@ -40,7 +40,6 @@ public class RestDeleteIndexTemplateAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(request.param("name")); DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(request.param("name"));
deleteIndexTemplateRequest.listenerThreaded(false);
deleteIndexTemplateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexTemplateRequest.masterNodeTimeout())); deleteIndexTemplateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexTemplateRequest.masterNodeTimeout()));
client.admin().indices().deleteTemplate(deleteIndexTemplateRequest, new AcknowledgedRestListener<DeleteIndexTemplateResponse>(channel)); client.admin().indices().deleteTemplate(deleteIndexTemplateRequest, new AcknowledgedRestListener<DeleteIndexTemplateResponse>(channel));
} }

View File

@ -58,8 +58,6 @@ public class RestGetIndexTemplateAction extends BaseRestHandler {
getIndexTemplatesRequest.local(request.paramAsBoolean("local", getIndexTemplatesRequest.local())); getIndexTemplatesRequest.local(request.paramAsBoolean("local", getIndexTemplatesRequest.local()));
getIndexTemplatesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getIndexTemplatesRequest.masterNodeTimeout())); getIndexTemplatesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getIndexTemplatesRequest.masterNodeTimeout()));
getIndexTemplatesRequest.listenerThreaded(false);
final boolean implicitAll = getIndexTemplatesRequest.names().length == 0; final boolean implicitAll = getIndexTemplatesRequest.names().length == 0;
client.admin().indices().getTemplates(getIndexTemplatesRequest, new RestBuilderListener<GetIndexTemplatesResponse>(channel) { client.admin().indices().getTemplates(getIndexTemplatesRequest, new RestBuilderListener<GetIndexTemplatesResponse>(channel) {

View File

@ -42,7 +42,6 @@ public class RestPutIndexTemplateAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
PutIndexTemplateRequest putRequest = new PutIndexTemplateRequest(request.param("name")); PutIndexTemplateRequest putRequest = new PutIndexTemplateRequest(request.param("name"));
putRequest.listenerThreaded(false);
putRequest.template(request.param("template", putRequest.template())); putRequest.template(request.param("template", putRequest.template()));
putRequest.order(request.paramAsInt("order", putRequest.order())); putRequest.order(request.paramAsInt("order", putRequest.order()));
putRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putRequest.masterNodeTimeout())); putRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putRequest.masterNodeTimeout()));

View File

@ -57,7 +57,6 @@ public class RestValidateQueryAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
ValidateQueryRequest validateQueryRequest = new ValidateQueryRequest(Strings.splitStringByCommaToArray(request.param("index"))); ValidateQueryRequest validateQueryRequest = new ValidateQueryRequest(Strings.splitStringByCommaToArray(request.param("index")));
validateQueryRequest.listenerThreaded(false);
validateQueryRequest.indicesOptions(IndicesOptions.fromRequest(request, validateQueryRequest.indicesOptions())); validateQueryRequest.indicesOptions(IndicesOptions.fromRequest(request, validateQueryRequest.indicesOptions()));
if (RestActions.hasBodyContent(request)) { if (RestActions.hasBodyContent(request)) {
validateQueryRequest.source(RestActions.getRestContent(request)); validateQueryRequest.source(RestActions.getRestContent(request));

View File

@ -47,7 +47,6 @@ public class RestDeleteWarmerAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
DeleteWarmerRequest deleteWarmerRequest = new DeleteWarmerRequest(Strings.splitStringByCommaToArray(request.param("name"))) DeleteWarmerRequest deleteWarmerRequest = new DeleteWarmerRequest(Strings.splitStringByCommaToArray(request.param("name")))
.indices(Strings.splitStringByCommaToArray(request.param("index"))); .indices(Strings.splitStringByCommaToArray(request.param("index")));
deleteWarmerRequest.listenerThreaded(false);
deleteWarmerRequest.timeout(request.paramAsTime("timeout", deleteWarmerRequest.timeout())); deleteWarmerRequest.timeout(request.paramAsTime("timeout", deleteWarmerRequest.timeout()));
deleteWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWarmerRequest.masterNodeTimeout())); deleteWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWarmerRequest.masterNodeTimeout()));
deleteWarmerRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteWarmerRequest.indicesOptions())); deleteWarmerRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteWarmerRequest.indicesOptions()));

View File

@ -59,7 +59,6 @@ public class RestPutWarmerAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
PutWarmerRequest putWarmerRequest = new PutWarmerRequest(request.param("name")); PutWarmerRequest putWarmerRequest = new PutWarmerRequest(request.param("name"));
putWarmerRequest.listenerThreaded(false);
SearchRequest searchRequest = new SearchRequest(Strings.splitStringByCommaToArray(request.param("index"))) SearchRequest searchRequest = new SearchRequest(Strings.splitStringByCommaToArray(request.param("index")))
.types(Strings.splitStringByCommaToArray(request.param("type"))) .types(Strings.splitStringByCommaToArray(request.param("type")))
.queryCache(request.paramAsBoolean("query_cache", null)) .queryCache(request.paramAsBoolean("query_cache", null))

View File

@ -71,7 +71,6 @@ public class RestBulkAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
BulkRequest bulkRequest = Requests.bulkRequest(); BulkRequest bulkRequest = Requests.bulkRequest();
bulkRequest.listenerThreaded(false);
String defaultIndex = request.param("index"); String defaultIndex = request.param("index");
String defaultType = request.param("type"); String defaultType = request.param("type");
String defaultRouting = request.param("routing"); String defaultRouting = request.param("routing");

View File

@ -68,7 +68,6 @@ public class RestRecoveryAction extends AbstractCatAction {
final RecoveryRequest recoveryRequest = new RecoveryRequest(Strings.splitStringByCommaToArray(request.param("index"))); final RecoveryRequest recoveryRequest = new RecoveryRequest(Strings.splitStringByCommaToArray(request.param("index")));
recoveryRequest.detailed(request.paramAsBoolean("detailed", false)); recoveryRequest.detailed(request.paramAsBoolean("detailed", false));
recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false)); recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false));
recoveryRequest.listenerThreaded(false);
recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions())); recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions()));
client.admin().indices().recoveries(recoveryRequest, new RestResponseListener<RecoveryResponse>(channel) { client.admin().indices().recoveries(recoveryRequest, new RestResponseListener<RecoveryResponse>(channel) {

View File

@ -58,7 +58,6 @@ public class RestCountAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
CountRequest countRequest = new CountRequest(Strings.splitStringByCommaToArray(request.param("index"))); CountRequest countRequest = new CountRequest(Strings.splitStringByCommaToArray(request.param("index")));
countRequest.indicesOptions(IndicesOptions.fromRequest(request, countRequest.indicesOptions())); countRequest.indicesOptions(IndicesOptions.fromRequest(request, countRequest.indicesOptions()));
countRequest.listenerThreaded(false);
if (RestActions.hasBodyContent(request)) { if (RestActions.hasBodyContent(request)) {
countRequest.source(RestActions.getRestContent(request)); countRequest.source(RestActions.getRestContent(request));
} else { } else {

View File

@ -51,7 +51,6 @@ public class RestDeleteAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id")); DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id"));
deleteRequest.listenerThreaded(false);
deleteRequest.operationThreaded(true); deleteRequest.operationThreaded(true);
deleteRequest.routing(request.param("routing")); deleteRequest.routing(request.param("routing"));

View File

@ -48,7 +48,6 @@ public class RestExistsAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
final ExistsRequest existsRequest = new ExistsRequest(Strings.splitStringByCommaToArray(request.param("index"))); final ExistsRequest existsRequest = new ExistsRequest(Strings.splitStringByCommaToArray(request.param("index")));
existsRequest.indicesOptions(IndicesOptions.fromRequest(request, existsRequest.indicesOptions())); existsRequest.indicesOptions(IndicesOptions.fromRequest(request, existsRequest.indicesOptions()));
existsRequest.listenerThreaded(false);
if (RestActions.hasBodyContent(request)) { if (RestActions.hasBodyContent(request)) {
existsRequest.source(RestActions.getRestContent(request)); existsRequest.source(RestActions.getRestContent(request));
} else { } else {

View File

@ -57,7 +57,6 @@ public class RestFieldStatsAction extends BaseRestHandler {
fieldStatsRequest.indicesOptions(IndicesOptions.fromRequest(request, fieldStatsRequest.indicesOptions())); fieldStatsRequest.indicesOptions(IndicesOptions.fromRequest(request, fieldStatsRequest.indicesOptions()));
fieldStatsRequest.fields(Strings.splitStringByCommaToArray(request.param("fields"))); fieldStatsRequest.fields(Strings.splitStringByCommaToArray(request.param("fields")));
fieldStatsRequest.level(request.param("level", FieldStatsRequest.DEFAULT_LEVEL)); fieldStatsRequest.level(request.param("level", FieldStatsRequest.DEFAULT_LEVEL));
fieldStatsRequest.listenerThreaded(false);
client.fieldStats(fieldStatsRequest, new RestBuilderListener<FieldStatsResponse>(channel) { client.fieldStats(fieldStatsRequest, new RestBuilderListener<FieldStatsResponse>(channel) {
@Override @Override

View File

@ -50,7 +50,6 @@ public class RestGetAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
final GetRequest getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id")); final GetRequest getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
getRequest.listenerThreaded(false);
getRequest.operationThreaded(true); getRequest.operationThreaded(true);
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh())); getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.routing(request.param("routing")); // order is important, set it after routing, so it will set the routing getRequest.routing(request.param("routing")); // order is important, set it after routing, so it will set the routing

View File

@ -51,7 +51,6 @@ public class RestGetSourceAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
final GetRequest getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id")); final GetRequest getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
getRequest.listenerThreaded(false);
getRequest.operationThreaded(true); getRequest.operationThreaded(true);
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh())); getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.routing(request.param("routing")); // order is important, set it after routing, so it will set the routing getRequest.routing(request.param("routing")); // order is important, set it after routing, so it will set the routing

View File

@ -47,7 +47,6 @@ public class RestHeadAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
final GetRequest getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id")); final GetRequest getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
getRequest.listenerThreaded(false);
getRequest.operationThreaded(true); getRequest.operationThreaded(true);
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh())); getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.routing(request.param("routing")); // order is important, set it after routing, so it will set the routing getRequest.routing(request.param("routing")); // order is important, set it after routing, so it will set the routing

View File

@ -53,7 +53,6 @@ public class RestMultiGetAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest(); MultiGetRequest multiGetRequest = new MultiGetRequest();
multiGetRequest.listenerThreaded(false);
multiGetRequest.refresh(request.paramAsBoolean("refresh", multiGetRequest.refresh())); multiGetRequest.refresh(request.paramAsBoolean("refresh", multiGetRequest.refresh()));
multiGetRequest.preference(request.param("preference")); multiGetRequest.preference(request.param("preference"));
multiGetRequest.realtime(request.paramAsBoolean("realtime", null)); multiGetRequest.realtime(request.paramAsBoolean("realtime", null));

View File

@ -70,7 +70,6 @@ public class RestIndexAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id")); IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(true); indexRequest.operationThreaded(true);
indexRequest.routing(request.param("routing")); indexRequest.routing(request.param("routing"));
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing

View File

@ -50,8 +50,6 @@ public class RestMoreLikeThisAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
MoreLikeThisRequest mltRequest = moreLikeThisRequest(request.param("index")).type(request.param("type")).id(request.param("id")); MoreLikeThisRequest mltRequest = moreLikeThisRequest(request.param("index")).type(request.param("type")).id(request.param("id"));
mltRequest.routing(request.param("routing")); mltRequest.routing(request.param("routing"));
mltRequest.listenerThreaded(false);
//TODO the ParseField class that encapsulates the supported names used for an attribute //TODO the ParseField class that encapsulates the supported names used for an attribute
//needs some work if it is to be used in a REST context like this too //needs some work if it is to be used in a REST context like this too
// See the MoreLikeThisQueryParser constants that hold the valid syntax // See the MoreLikeThisQueryParser constants that hold the valid syntax

View File

@ -94,8 +94,6 @@ public class RestPercolateAction extends BaseRestHandler {
} }
void executePercolate(final PercolateRequest percolateRequest, final RestChannel restChannel, final Client client) { void executePercolate(final PercolateRequest percolateRequest, final RestChannel restChannel, final Client client) {
// we just send a response, no need to fork
percolateRequest.listenerThreaded(false);
client.percolate(percolateRequest, new RestToXContentListener<PercolateResponse>(restChannel)); client.percolate(percolateRequest, new RestToXContentListener<PercolateResponse>(restChannel));
} }

View File

@ -75,7 +75,7 @@ public class RestPutIndexedScriptAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, Client client) {
PutIndexedScriptRequest putRequest = new PutIndexedScriptRequest(getScriptLang(request), request.param("id")).listenerThreaded(false); PutIndexedScriptRequest putRequest = new PutIndexedScriptRequest(getScriptLang(request), request.param("id"));
putRequest.version(request.paramAsLong("version", putRequest.version())); putRequest.version(request.paramAsLong("version", putRequest.version()));
putRequest.versionType(VersionType.fromString(request.param("version_type"), putRequest.versionType())); putRequest.versionType(VersionType.fromString(request.param("version_type"), putRequest.versionType()));
putRequest.source(request.content()); putRequest.source(request.content());

View File

@ -56,7 +56,6 @@ public class RestMultiSearchAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
multiSearchRequest.listenerThreaded(false);
String[] indices = Strings.splitStringByCommaToArray(request.param("index")); String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
String[] types = Strings.splitStringByCommaToArray(request.param("type")); String[] types = Strings.splitStringByCommaToArray(request.param("type"));

View File

@ -77,7 +77,6 @@ public class RestSearchAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
SearchRequest searchRequest; SearchRequest searchRequest;
searchRequest = RestSearchAction.parseSearchRequest(request); searchRequest = RestSearchAction.parseSearchRequest(request);
searchRequest.listenerThreaded(false);
client.search(searchRequest, new RestStatusToXContentListener<SearchResponse>(channel)); client.search(searchRequest, new RestStatusToXContentListener<SearchResponse>(channel));
} }

View File

@ -60,7 +60,6 @@ public class RestSearchScrollAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
String scrollId = request.param("scroll_id"); String scrollId = request.param("scroll_id");
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(); SearchScrollRequest searchScrollRequest = new SearchScrollRequest();
searchScrollRequest.listenerThreaded(false);
searchScrollRequest.scrollId(scrollId); searchScrollRequest.scrollId(scrollId);
String scroll = request.param("scroll"); String scroll = request.param("scroll");
if (scroll != null) { if (scroll != null) {

View File

@ -59,7 +59,6 @@ public class RestSuggestAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
SuggestRequest suggestRequest = new SuggestRequest(Strings.splitStringByCommaToArray(request.param("index"))); SuggestRequest suggestRequest = new SuggestRequest(Strings.splitStringByCommaToArray(request.param("index")));
suggestRequest.indicesOptions(IndicesOptions.fromRequest(request, suggestRequest.indicesOptions())); suggestRequest.indicesOptions(IndicesOptions.fromRequest(request, suggestRequest.indicesOptions()));
suggestRequest.listenerThreaded(false);
if (RestActions.hasBodyContent(request)) { if (RestActions.hasBodyContent(request)) {
suggestRequest.suggest(RestActions.getRestContent(request)); suggestRequest.suggest(RestActions.getRestContent(request));
} else { } else {

View File

@ -49,7 +49,6 @@ public class RestMultiTermVectorsAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
MultiTermVectorsRequest multiTermVectorsRequest = new MultiTermVectorsRequest(); MultiTermVectorsRequest multiTermVectorsRequest = new MultiTermVectorsRequest();
multiTermVectorsRequest.listenerThreaded(false);
TermVectorsRequest template = new TermVectorsRequest(); TermVectorsRequest template = new TermVectorsRequest();
template.index(request.param("index")); template.index(request.param("index"));
template.type(request.param("type")); template.type(request.param("type"));

View File

@ -55,7 +55,6 @@ public class RestUpdateAction extends BaseRestHandler {
@Override @Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception { public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
UpdateRequest updateRequest = new UpdateRequest(request.param("index"), request.param("type"), request.param("id")); UpdateRequest updateRequest = new UpdateRequest(request.param("index"), request.param("type"), request.param("id"));
updateRequest.listenerThreaded(false);
updateRequest.routing(request.param("routing")); updateRequest.routing(request.param("routing"));
updateRequest.parent(request.param("parent")); updateRequest.parent(request.param("parent"));
updateRequest.timeout(request.paramAsTime("timeout", updateRequest.timeout())); updateRequest.timeout(request.paramAsTime("timeout", updateRequest.timeout()));

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -231,7 +232,7 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
logger.trace("river {} is already allocated", routing.riverName().getName()); logger.trace("river {} is already allocated", routing.riverName().getName());
continue; continue;
} }
prepareGetMetaDocument(routing.riverName().name()).execute(new ActionListener<GetResponse>() { prepareGetMetaDocument(routing.riverName().name()).execute(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, new ActionListener<GetResponse>() {
@Override @Override
public void onResponse(GetResponse getResponse) { public void onResponse(GetResponse getResponse) {
if (!rivers.containsKey(routing.riverName())) { if (!rivers.containsKey(routing.riverName())) {
@ -255,7 +256,7 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name()); logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name());
final ActionListener<GetResponse> listener = this; final ActionListener<GetResponse> listener = this;
try { try {
threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.SAME, new Runnable() { threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.LISTENER, new Runnable() {
@Override @Override
public void run() { public void run() {
prepareGetMetaDocument(routing.riverName().name()).execute(listener); prepareGetMetaDocument(routing.riverName().name()).execute(listener);
@ -268,12 +269,12 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name()); logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name());
} }
} }
}); }));
} }
} }
private GetRequestBuilder prepareGetMetaDocument(String riverName) { private GetRequestBuilder prepareGetMetaDocument(String riverName) {
return client.prepareGet(riverIndexName, riverName, "_meta").setPreference("_primary").setListenerThreaded(true); return client.prepareGet(riverIndexName, riverName, "_meta").setPreference("_primary");
} }
} }
} }

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class ListenerActionTests extends ElasticsearchIntegrationTest {
@Test
public void verifyThreadedListeners() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<>();
final AtomicReference<String> threadName = new AtomicReference<>();
Client client = client();
IndexRequest request = new IndexRequest("test", "type", "1");
if (randomBoolean()) {
// set the source, without it, we will have a verification failure
request.source("field1", "value1");
}
client.index(request, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
threadName.set(Thread.currentThread().getName());
latch.countDown();
}
@Override
public void onFailure(Throwable e) {
threadName.set(Thread.currentThread().getName());
failure.set(e);
latch.countDown();
}
});
latch.await();
boolean shouldBeThreaded = DiscoveryNode.clientNode(client.settings()) || TransportClient.CLIENT_TYPE.equals(client.settings().get(Client.CLIENT_TYPE_SETTING));
if (shouldBeThreaded) {
assertTrue(threadName.get().contains("listener"));
} else {
assertFalse(threadName.get().contains("listener"));
}
}
}

View File

@ -92,7 +92,7 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase {
} }
} }
PlainListenableActionFuture<TestResponse> future = new PlainListenableActionFuture<>(false, null); PlainListenableActionFuture<TestResponse> future = new PlainListenableActionFuture<>(null);
transportAction.execute(new TestRequest(), future); transportAction.execute(new TestRequest(), future);
try { try {
assertThat(future.get(), notNullValue()); assertThat(future.get(), notNullValue());
@ -174,7 +174,7 @@ public class TransportActionFilterChainTests extends ElasticsearchTestCase {
} }
} }
PlainListenableActionFuture<TestResponse> future = new PlainListenableActionFuture<>(false, null); PlainListenableActionFuture<TestResponse> future = new PlainListenableActionFuture<>(null);
transportAction.execute(new TestRequest(), future); transportAction.execute(new TestRequest(), future);
try { try {
assertThat(future.get(), notNullValue()); assertThat(future.get(), notNullValue());

View File

@ -51,6 +51,7 @@ import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportMessage; import org.elasticsearch.transport.TransportMessage;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -84,16 +85,19 @@ public abstract class AbstractClientHeadersTests extends ElasticsearchTestCase {
CreateIndexAction.INSTANCE, IndicesStatsAction.INSTANCE, ClearIndicesCacheAction.INSTANCE, FlushAction.INSTANCE CreateIndexAction.INSTANCE, IndicesStatsAction.INSTANCE, ClearIndicesCacheAction.INSTANCE, FlushAction.INSTANCE
}; };
protected ThreadPool threadPool;
private Client client; private Client client;
@Before @Before
public void initClient() { public void initClient() {
threadPool = new ThreadPool("test-" + getTestName());
client = buildClient(HEADER_SETTINGS, ACTIONS); client = buildClient(HEADER_SETTINGS, ACTIONS);
} }
@After @After
public void cleanupClient() { public void cleanupClient() throws Exception {
client.close(); client.close();
terminate(threadPool);
} }
protected abstract Client buildClient(Settings headersSettings, GenericAction[] testedActions); protected abstract Client buildClient(Settings headersSettings, GenericAction[] testedActions);

View File

@ -42,18 +42,6 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTests {
private static final ActionFilters EMPTY_FILTERS = new ActionFilters(ImmutableSet.of()); private static final ActionFilters EMPTY_FILTERS = new ActionFilters(ImmutableSet.of());
private ThreadPool threadPool;
@Before
public void init() {
threadPool = new ThreadPool("test");
}
@After
public void cleanup() throws InterruptedException {
terminate(threadPool);
}
@Override @Override
protected Client buildClient(Settings headersSettings, GenericAction[] testedActions) { protected Client buildClient(Settings headersSettings, GenericAction[] testedActions) {
Settings settings = HEADER_SETTINGS; Settings settings = HEADER_SETTINGS;
@ -61,8 +49,8 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTests {
Headers headers = new Headers(settings); Headers headers = new Headers(settings);
Actions actions = new Actions(settings, threadPool, testedActions); Actions actions = new Actions(settings, threadPool, testedActions);
NodeClusterAdminClient clusterClient = new NodeClusterAdminClient(threadPool, actions, headers); NodeClusterAdminClient clusterClient = new NodeClusterAdminClient(settings, threadPool, actions, headers);
NodeIndicesAdminClient indicesClient = new NodeIndicesAdminClient(threadPool, actions, headers); NodeIndicesAdminClient indicesClient = new NodeIndicesAdminClient(settings, threadPool, actions, headers);
NodeAdminClient adminClient = new NodeAdminClient(settings, clusterClient, indicesClient); NodeAdminClient adminClient = new NodeAdminClient(settings, clusterClient, indicesClient);
return new NodeClient(settings, threadPool, adminClient, actions, headers); return new NodeClient(settings, threadPool, adminClient, actions, headers);
} }

View File

@ -84,7 +84,7 @@ public class TransportClientRetryTests extends ElasticsearchIntegrationTest {
if (randomBoolean()) { if (randomBoolean()) {
clusterState = transportClient.admin().cluster().state(clusterStateRequest).get().getState(); clusterState = transportClient.admin().cluster().state(clusterStateRequest).get().getState();
} else { } else {
PlainListenableActionFuture<ClusterStateResponse> future = new PlainListenableActionFuture<>(clusterStateRequest.listenerThreaded(), transportClient.threadPool()); PlainListenableActionFuture<ClusterStateResponse> future = new PlainListenableActionFuture<>(transportClient.threadPool());
transportClient.admin().cluster().state(clusterStateRequest, future); transportClient.admin().cluster().state(clusterStateRequest, future);
clusterState = future.get().getState(); clusterState = future.get().getState();
} }