explicitly set listener to false when applicable

This commit is contained in:
Shay Banon 2012-06-01 11:25:55 +02:00
parent 98d2e7c031
commit a0d1b9b695
46 changed files with 64 additions and 31 deletions

View File

@ -127,6 +127,12 @@ public class ClusterStateRequest extends MasterNodeOperationRequest {
return this.local;
}
@Override
public ClusterStateRequest listenerThreaded(boolean listenerThreaded) {
super.listenerThreaded(listenerThreaded);
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -87,6 +87,8 @@ public class TransportSearchScrollAction extends TransportAction<SearchScrollReq
@Override
public void messageReceived(SearchScrollRequest request, final TransportChannel channel) throws Exception {
// no need for a threaded listener
request.listenerThreaded(false);
execute(request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse result) {

View File

@ -51,6 +51,7 @@ public class RestClusterHealthAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(RestActions.splitIndices(request.param("index")));
clusterHealthRequest.listenerThreaded(false);
int level = 0;
try {
clusterHealthRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterHealthRequest.masterNodeTimeout()));

View File

@ -45,6 +45,7 @@ public class RestClusterRerouteAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
final ClusterRerouteRequest clusterRerouteRequest = Requests.clusterRerouteRequest();
clusterRerouteRequest.listenerThreaded(false);
client.admin().cluster().reroute(clusterRerouteRequest, new ActionListener<ClusterRerouteResponse>() {
@Override
public void onResponse(ClusterRerouteResponse response) {

View File

@ -46,6 +46,7 @@ public class RestClusterGetSettingsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest()
.listenerThreaded(false)
.filterRoutingTable(true)
.filterNodes(true);
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {

View File

@ -46,7 +46,7 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest();
clusterUpdateSettingsRequest.listenerThreaded(false);
try {
XContentType xContentType = XContentFactory.xContentType(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());
Map<String, Object> source = XContentFactory.xContent(xContentType)

View File

@ -72,6 +72,7 @@ public class RestClusterStateAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
final ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest();
clusterStateRequest.listenerThreaded(false);
clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout()));
clusterStateRequest.filterNodes(request.paramAsBoolean("filter_nodes", clusterStateRequest.filterNodes()));
clusterStateRequest.filterRoutingTable(request.paramAsBoolean("filter_routing_table", clusterStateRequest.filterRoutingTable()));

View File

@ -61,6 +61,8 @@ public class RestGetIndicesAliasesAction extends BaseRestHandler {
.filterNodes(true)
.filteredIndices(indices);
clusterStateRequest.listenerThreaded(false);
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse response) {

View File

@ -55,6 +55,7 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
indicesAliasesRequest.listenerThreaded(false);
try {
// {
// actions : [

View File

@ -66,6 +66,7 @@ public class RestAnalyzeAction extends BaseRestHandler {
}
AnalyzeRequest analyzeRequest = new AnalyzeRequest(request.param("index"), text);
analyzeRequest.listenerThreaded(false);
analyzeRequest.preferLocal(request.paramAsBoolean("prefer_local", analyzeRequest.preferLocalShard()));
analyzeRequest.analyzer(request.param("analyzer"));
analyzeRequest.field(request.param("field"));

View File

@ -57,6 +57,7 @@ public class RestClearIndicesCacheAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(RestActions.splitIndices(request.param("index")));
clearIndicesCacheRequest.listenerThreaded(false);
try {
clearIndicesCacheRequest.filterCache(request.paramAsBoolean("filter", clearIndicesCacheRequest.filterCache()));
clearIndicesCacheRequest.fieldDataCache(request.paramAsBoolean("field_data", clearIndicesCacheRequest.fieldDataCache()));
@ -64,8 +65,6 @@ public class RestClearIndicesCacheAction extends BaseRestHandler {
clearIndicesCacheRequest.bloomCache(request.paramAsBoolean("bloom", clearIndicesCacheRequest.bloomCache()));
clearIndicesCacheRequest.fields(request.paramAsStringArray("fields", clearIndicesCacheRequest.fields()));
// we just send back a response, no need to fork a listener
clearIndicesCacheRequest.listenerThreaded(false);
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operationThreading"), BroadcastOperationThreading.SINGLE_THREAD);
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {
// since we don't spawn, don't allow no_threads, but change it to a single thread

View File

@ -49,6 +49,7 @@ public class RestCloseIndexAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(request.param("index"));
closeIndexRequest.listenerThreaded(false);
closeIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
client.admin().indices().close(closeIndexRequest, new ActionListener<CloseIndexResponse>() {
@Override

View File

@ -50,6 +50,7 @@ public class RestCreateIndexAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(request.param("index"));
createIndexRequest.listenerThreaded(false);
if (request.hasContent()) {
try {
createIndexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());

View File

@ -51,6 +51,7 @@ public class RestDeleteIndexAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(splitIndices(request.param("index")));
deleteIndexRequest.listenerThreaded(false);
deleteIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
client.admin().indices().delete(deleteIndexRequest, new ActionListener<DeleteIndexResponse>() {
@Override

View File

@ -29,7 +29,8 @@ import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.rest.*;
import static org.elasticsearch.rest.RestRequest.Method.HEAD;
import static org.elasticsearch.rest.RestStatus.*;
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.rest.action.support.RestActions.splitIndices;
/**
@ -51,7 +52,6 @@ public class RestIndicesExistsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(splitIndices(request.param("index")));
// we just send back a response, no need to fork a listener
indicesExistsRequest.listenerThreaded(false);
client.admin().indices().exists(indicesExistsRequest, new ActionListener<IndicesExistsResponse>() {
@Override

View File

@ -56,7 +56,6 @@ public class RestFlushAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
FlushRequest flushRequest = new FlushRequest(RestActions.splitIndices(request.param("index")));
// we just send back a response, no need to fork a listener
flushRequest.listenerThreaded(false);
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operationThreading"), BroadcastOperationThreading.SINGLE_THREAD);
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {

View File

@ -51,6 +51,7 @@ public class RestDeleteMappingAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
DeleteMappingRequest deleteMappingRequest = deleteMappingRequest(splitIndices(request.param("index")));
deleteMappingRequest.listenerThreaded(false);
deleteMappingRequest.type(request.param("type"));
client.admin().indices().deleteMapping(deleteMappingRequest, new ActionListener<DeleteMappingResponse>() {
@Override

View File

@ -69,6 +69,8 @@ public class RestGetMappingAction extends BaseRestHandler {
.filterNodes(true)
.filteredIndices(indices);
clusterStateRequest.listenerThreaded(false);
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse response) {

View File

@ -56,6 +56,7 @@ public class RestPutMappingAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
PutMappingRequest putMappingRequest = putMappingRequest(splitIndices(request.param("index")));
putMappingRequest.listenerThreaded(false);
putMappingRequest.type(request.param("type"));
putMappingRequest.source(request.contentAsString());
putMappingRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));

View File

@ -49,6 +49,7 @@ public class RestOpenIndexAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
OpenIndexRequest openIndexRequest = new OpenIndexRequest(request.param("index"));
openIndexRequest.listenerThreaded(false);
openIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
client.admin().indices().open(openIndexRequest, new ActionListener<OpenIndexResponse>() {
@Override

View File

@ -57,6 +57,7 @@ public class RestOptimizeAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
OptimizeRequest optimizeRequest = new OptimizeRequest(RestActions.splitIndices(request.param("index")));
optimizeRequest.listenerThreaded(false);
try {
optimizeRequest.waitForMerge(request.paramAsBoolean("wait_for_merge", optimizeRequest.waitForMerge()));
optimizeRequest.maxNumSegments(request.paramAsInt("max_num_segments", optimizeRequest.maxNumSegments()));
@ -64,8 +65,6 @@ public class RestOptimizeAction extends BaseRestHandler {
optimizeRequest.flush(request.paramAsBoolean("flush", optimizeRequest.flush()));
optimizeRequest.refresh(request.paramAsBoolean("refresh", optimizeRequest.refresh()));
// we just send back a response, no need to fork a listener
optimizeRequest.listenerThreaded(false);
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operation_threading"), BroadcastOperationThreading.SINGLE_THREAD);
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {
// since we don't spawn, don't allow no_threads, but change it to a single thread

View File

@ -56,7 +56,6 @@ public class RestRefreshAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
RefreshRequest refreshRequest = new RefreshRequest(RestActions.splitIndices(request.param("index")));
// we just send back a response, no need to fork a listener
refreshRequest.listenerThreaded(false);
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operation_threading"), BroadcastOperationThreading.SINGLE_THREAD);
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {

View File

@ -51,7 +51,6 @@ public class RestIndicesSegmentsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest(splitIndices(request.param("index")));
// we just send back a response, no need to fork a listener
indicesSegmentsRequest.listenerThreaded(false);
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operation_threading"), BroadcastOperationThreading.SINGLE_THREAD);
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {

View File

@ -64,6 +64,7 @@ public class RestGetSettingsAction extends BaseRestHandler {
.filterRoutingTable(true)
.filterNodes(true)
.filteredIndices(indices);
clusterStateRequest.listenerThreaded(false);
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override

View File

@ -55,6 +55,8 @@ public class RestUpdateSettingsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
UpdateSettingsRequest updateSettingsRequest = updateSettingsRequest(splitIndices(request.param("index")));
updateSettingsRequest.listenerThreaded(false);
ImmutableSettings.Builder updateSettings = ImmutableSettings.settingsBuilder();
String bodySettings = request.contentAsString();
if (Strings.hasText(bodySettings)) {

View File

@ -81,6 +81,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
boolean clear = request.paramAsBoolean("clear", false);
if (clear) {
indicesStatsRequest.clear();
@ -136,6 +137,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().docs(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
indicesStatsRequest.types(splitTypes(request.param("types")));
@ -173,6 +175,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().store(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
indicesStatsRequest.types(splitTypes(request.param("types")));
@ -210,6 +213,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().indexing(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
if (request.hasParam("types")) {
@ -253,6 +257,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().search(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
if (request.hasParam("groups")) {
@ -296,6 +301,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().get(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
@ -332,6 +338,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().merge(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
indicesStatsRequest.types(splitTypes(request.param("types")));
@ -369,6 +376,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().flush(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
indicesStatsRequest.types(splitTypes(request.param("types")));
@ -406,6 +414,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().warmer(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
indicesStatsRequest.types(splitTypes(request.param("types")));
@ -443,6 +452,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().refresh(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
indicesStatsRequest.types(splitTypes(request.param("types")));

View File

@ -58,7 +58,6 @@ public class RestIndicesStatusAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatusRequest indicesStatusRequest = new IndicesStatusRequest(splitIndices(request.param("index")));
// we just send back a response, no need to fork a listener
indicesStatusRequest.listenerThreaded(false);
indicesStatusRequest.recovery(request.paramAsBoolean("recovery", indicesStatusRequest.recovery()));
indicesStatusRequest.snapshot(request.paramAsBoolean("snapshot", indicesStatusRequest.snapshot()));

View File

@ -49,6 +49,7 @@ public class RestDeleteIndexTemplateAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(request.param("name"));
deleteIndexTemplateRequest.listenerThreaded(false);
deleteIndexTemplateRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
client.admin().indices().deleteTemplate(deleteIndexTemplateRequest, new ActionListener<DeleteIndexTemplateResponse>() {
@Override

View File

@ -66,6 +66,8 @@ public class RestGetIndexTemplateAction extends BaseRestHandler {
.filteredIndexTemplates(request.param("name"))
.filteredIndices("_na");
clusterStateRequest.listenerThreaded(false);
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse response) {

View File

@ -59,7 +59,7 @@ public class RestPutIndexTemplateAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
PutIndexTemplateRequest putRequest = new PutIndexTemplateRequest(request.param("name"));
putRequest.listenerThreaded(false);
putRequest.template(request.param("template", putRequest.template()));
putRequest.order(request.paramAsInt("order", putRequest.order()));

View File

@ -61,7 +61,6 @@ public class RestValidateQueryAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
ValidateQueryRequest validateQueryRequest = new ValidateQueryRequest(RestActions.splitIndices(request.param("index")));
// we just send back a response, no need to fork a listener
validateQueryRequest.listenerThreaded(false);
try {
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operation_threading"), BroadcastOperationThreading.SINGLE_THREAD);
@ -109,7 +108,7 @@ public class RestValidateQueryAction extends BaseRestHandler {
buildBroadcastShardsHeader(builder, response);
if(response.queryExplanations() != null && !response.queryExplanations().isEmpty()) {
if (response.queryExplanations() != null && !response.queryExplanations().isEmpty()) {
builder.startArray("explanations");
for (QueryExplanation explanation : response.queryExplanations()) {
builder.startObject();

View File

@ -51,6 +51,7 @@ public class RestDeleteWarmerAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
DeleteWarmerRequest deleteWarmerRequest = new DeleteWarmerRequest(request.param("name"))
.indices(RestActions.splitIndices(request.param("index")));
deleteWarmerRequest.listenerThreaded(false);
client.admin().indices().deleteWarmer(deleteWarmerRequest, new ActionListener<DeleteWarmerResponse>() {
@Override
public void onResponse(DeleteWarmerResponse response) {

View File

@ -67,6 +67,8 @@ public class RestGetWarmerAction extends BaseRestHandler {
.filterMetaData(false)
.filteredIndices(indices);
clusterStateRequest.listenerThreaded(false);
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse response) {

View File

@ -50,6 +50,7 @@ public class RestPutWarmerAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
PutWarmerRequest putWarmerRequest = new PutWarmerRequest(request.param("name"));
putWarmerRequest.listenerThreaded(false);
SearchRequest searchRequest = new SearchRequest(RestActions.splitIndices(request.param("index")))
.types(RestActions.splitTypes(request.param("type")))
.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());

View File

@ -50,8 +50,6 @@ import static org.elasticsearch.rest.action.support.RestXContentBuilder.restCont
* { "create" : { "_index" : "test", "_type" : "type1", "_id" : "1" }
* { "type1" : { "field1" : "value1" } }
* </pre>
*
*
*/
public class RestBulkAction extends BaseRestHandler {
@ -70,6 +68,7 @@ public class RestBulkAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
BulkRequest bulkRequest = Requests.bulkRequest();
bulkRequest.listenerThreaded(false);
String defaultIndex = request.param("index");
String defaultType = request.param("type");

View File

@ -61,7 +61,6 @@ public class RestCountAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
CountRequest countRequest = new CountRequest(RestActions.splitIndices(request.param("index")));
// we just send back a response, no need to fork a listener
countRequest.listenerThreaded(false);
try {
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operation_threading"), BroadcastOperationThreading.SINGLE_THREAD);

View File

@ -54,15 +54,15 @@ public class RestDeleteAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id"));
deleteRequest.listenerThreaded(false);
deleteRequest.operationThreaded(true);
deleteRequest.parent(request.param("parent"));
deleteRequest.routing(request.param("routing"));
deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT));
deleteRequest.refresh(request.paramAsBoolean("refresh", deleteRequest.refresh()));
deleteRequest.version(RestActions.parseVersion(request));
// we just send a response, no need to fork
deleteRequest.listenerThreaded(false);
// we don't spawn, then fork if local
deleteRequest.operationThreaded(true);
deleteRequest.versionType(VersionType.fromString(request.param("version_type"), deleteRequest.versionType()));
String replicationType = request.param("replication");

View File

@ -58,7 +58,6 @@ public class RestDeleteByQueryAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(splitIndices(request.param("index")));
// we just build a response and send it, no need to fork a thread
deleteByQueryRequest.listenerThreaded(false);
try {
if (request.hasContent()) {

View File

@ -50,9 +50,7 @@ public class RestGetAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
final GetRequest getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
// no need to have a threaded listener since we just send back a response
getRequest.listenerThreaded(false);
// if we have a local operation, execute it on a thread since we don't spawn
getRequest.operationThreaded(true);
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.parent(request.param("parent"));

View File

@ -66,6 +66,8 @@ public class RestIndexAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(true);
indexRequest.routing(request.param("routing"));
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
indexRequest.timestamp(request.param("timestamp"));
@ -102,10 +104,6 @@ public class RestIndexAction extends BaseRestHandler {
if (consistencyLevel != null) {
indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
}
// we just send a response, no need to fork
indexRequest.listenerThreaded(false);
// we don't spawn, then fork if local
indexRequest.operationThreaded(true);
client.index(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {

View File

@ -52,6 +52,7 @@ public class RestMainAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.listenerThreaded(false);
clusterStateRequest.masterNodeTimeout(TimeValue.timeValueMillis(0));
clusterStateRequest.local(true);
clusterStateRequest.filterAll().filterBlocks(false);

View File

@ -55,6 +55,7 @@ public class RestMoreLikeThisAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
MoreLikeThisRequest mltRequest = moreLikeThisRequest(request.param("index")).type(request.param("type")).id(request.param("id"));
mltRequest.listenerThreaded(false);
try {
mltRequest.fields(request.paramAsStringArray("mlt_fields", null));
mltRequest.percentTermsToMatch(request.paramAsFloat("percent_terms_to_match", -1));

View File

@ -51,6 +51,7 @@ public class RestPercolateAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
PercolateRequest percolateRequest = new PercolateRequest(request.param("index"), request.param("type"));
percolateRequest.listenerThreaded(false);
percolateRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
// we just send a response, no need to fork

View File

@ -56,6 +56,7 @@ public class RestMultiSearchAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
multiSearchRequest.listenerThreaded(false);
String[] indices = RestActions.splitIndices(request.param("index"));
String[] types = RestActions.splitTypes(request.param("type"));

View File

@ -60,12 +60,12 @@ public class RestSearchScrollAction extends BaseRestHandler {
scrollId = request.contentAsString();
}
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
searchScrollRequest.listenerThreaded(false);
try {
String scroll = request.param("scroll");
if (scroll != null) {
searchScrollRequest.scroll(new Scroll(parseTimeValue(scroll, null)));
}
searchScrollRequest.listenerThreaded(false);
SearchOperationThreading operationThreading = SearchOperationThreading.fromString(request.param("operation_threading"), null);
if (operationThreading != null) {
if (operationThreading == SearchOperationThreading.NO_THREADS) {

View File

@ -55,6 +55,7 @@ public class RestUpdateAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
UpdateRequest updateRequest = new UpdateRequest(request.param("index"), request.param("type"), request.param("id"));
updateRequest.listenerThreaded(false);
updateRequest.routing(request.param("routing"));
updateRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
updateRequest.timeout(request.paramAsTime("timeout", updateRequest.timeout()));
@ -68,8 +69,6 @@ public class RestUpdateAction extends BaseRestHandler {
updateRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
}
updateRequest.percolate(request.param("percolate", null));
// we just send a response, no need to fork
updateRequest.listenerThreaded(false);
updateRequest.script(request.param("script"));
updateRequest.scriptLang(request.param("lang"));
for (Map.Entry<String, String> entry : request.params().entrySet()) {