simplify threaded listener invocation by wrapping the listener with a threaded listener and not check in each action for it

This commit is contained in:
kimchy 2011-02-15 07:52:24 +02:00
parent 3ed848a495
commit 36efde8c1d
20 changed files with 120 additions and 384 deletions

View File

@ -63,8 +63,6 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
private final boolean allowIdGeneration;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final TransportShardBulkAction shardBulkAction;
@ -73,8 +71,7 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
@Inject public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction) {
super(settings);
this.threadPool = threadPool;
super(settings, threadPool);
this.clusterService = clusterService;
this.shardBulkAction = shardBulkAction;
this.createIndexAction = createIndexAction;
@ -209,16 +206,7 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
}
if (requestsByShard.isEmpty()) {
// all failures, no shards to process, send a response
if (bulkRequest.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime));
}
});
} else {
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime));
}
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime));
return;
}
@ -263,15 +251,7 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
}
private void finishHim() {
if (bulkRequest.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime));
}
});
} else {
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime));
}
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime));
}
});
}

View File

@ -69,9 +69,9 @@ public class TransportMoreLikeThisAction extends BaseAction<MoreLikeThisRequest,
private final ClusterService clusterService;
@Inject public TransportMoreLikeThisAction(Settings settings, TransportSearchAction searchAction, TransportGetAction getAction,
@Inject public TransportMoreLikeThisAction(Settings settings, ThreadPool threadPool, TransportSearchAction searchAction, TransportGetAction getAction,
ClusterService clusterService, IndicesService indicesService, TransportService transportService) {
super(settings);
super(settings, threadPool);
this.searchAction = searchAction;
this.getAction = getAction;
this.indicesService = indicesService;

View File

@ -56,12 +56,13 @@ public class TransportSearchAction extends BaseAction<SearchRequest, SearchRespo
private final boolean optimizeSingleShard;
@Inject public TransportSearchAction(Settings settings, TransportService transportService, ClusterService clusterService,
@Inject public TransportSearchAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService,
TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction,
TransportSearchQueryThenFetchAction queryThenFetchAction,
TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction,
TransportSearchQueryAndFetchAction queryAndFetchAction) {
super(settings);
super(settings, threadPool);
this.clusterService = clusterService;
this.dfsQueryThenFetchAction = dfsQueryThenFetchAction;
this.queryThenFetchAction = queryThenFetchAction;

View File

@ -45,10 +45,10 @@ public class TransportSearchScrollAction extends BaseAction<SearchScrollRequest,
private final TransportSearchScrollQueryAndFetchAction queryAndFetchAction;
@Inject public TransportSearchScrollAction(Settings settings, TransportService transportService,
@Inject public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, TransportService transportService,
TransportSearchScrollQueryThenFetchAction queryThenFetchAction,
TransportSearchScrollQueryAndFetchAction queryAndFetchAction) {
super(settings);
super(settings, threadPool);
this.queryThenFetchAction = queryThenFetchAction;
this.queryAndFetchAction = queryAndFetchAction;

View File

@ -156,7 +156,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
try {
innerFinishHim();
} catch (Exception e) {
invokeListener(new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures()));
listener.onFailure(new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures()));
} finally {
searchCache.releaseDfsResults(dfsResults);
searchCache.releaseQueryFetchResults(queryFetchResults);
@ -170,7 +170,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), dfsResults);
}
invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}

View File

@ -163,7 +163,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
try {
innerExecuteFetchPhase();
} catch (Exception e) {
invokeListener(new ReduceSearchPhaseException("query", "", e, buildShardFailures()));
listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures()));
}
}
@ -249,7 +249,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
try {
innerFinishHim();
} catch (Exception e) {
invokeListener(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
} finally {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
searchCache.releaseDfsResults(dfsResults);
@ -264,7 +264,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults);
}
invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}

View File

@ -82,7 +82,7 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), queryFetchResults.values());
}
invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
searchCache.releaseQueryFetchResults(queryFetchResults);
}
}

View File

@ -163,7 +163,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
try {
innerFinishHim();
} catch (Exception e) {
invokeListener(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
} finally {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
searchCache.releaseQueryResults(queryResults);
@ -177,7 +177,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values());
}
invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}

View File

@ -104,7 +104,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
public void start() {
if (scrollId.values().length == 0) {
invokeListener(new SearchPhaseExecutionException("query", "no nodes to search on", null));
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null));
}
int localOperations = 0;
@ -199,7 +199,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
try {
innerFinishHim();
} catch (Exception e) {
invokeListener(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)));
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)));
}
}
@ -211,32 +211,8 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
scrollId = request.scrollId();
}
searchCache.releaseQueryFetchResults(queryFetchResults);
invokeListener(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(),
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache)));
}
protected void invokeListener(final SearchResponse response) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
} else {
listener.onResponse(response);
}
}
protected void invokeListener(final Throwable t) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(t);
}
});
} else {
listener.onFailure(t);
}
}
}
}

View File

@ -109,7 +109,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
public void start() {
if (scrollId.values().length == 0) {
invokeListener(new SearchPhaseExecutionException("query", "no nodes to search on", null));
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null));
}
final AtomicInteger counter = new AtomicInteger(scrollId.values().length);
@ -228,7 +228,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
try {
innerFinishHim();
} catch (Exception e) {
invokeListener(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)));
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)));
}
}
@ -238,34 +238,10 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
if (request.scroll() != null) {
scrollId = request.scrollId();
}
invokeListener(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(),
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.values().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache)));
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);
}
protected void invokeListener(final SearchResponse response) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
} else {
listener.onResponse(response);
}
}
protected void invokeListener(final Throwable t) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(t);
}
});
} else {
listener.onFailure(t);
}
}
}
}

View File

@ -55,8 +55,6 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
*/
public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest, SearchResponse> {
protected final ThreadPool threadPool;
protected final ClusterService clusterService;
protected final SearchServiceTransportAction searchService;
@ -67,8 +65,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
public TransportSearchTypeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportSearchCache searchCache, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.threadPool = threadPool;
super(settings, threadPool);
this.clusterService = clusterService;
this.searchCache = searchCache;
this.searchService = searchService;
@ -221,7 +218,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
if (logger.isDebugEnabled()) {
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e);
}
invokeListener(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
}
}
}
@ -247,12 +244,12 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
if (successulOps.get() == 0) {
// no successful ops, raise an exception
invokeListener(new SearchPhaseExecutionException(firstPhaseName(), "total failure", buildShardFailures()));
listener.onFailure(new SearchPhaseExecutionException(firstPhaseName(), "total failure", buildShardFailures()));
} else {
try {
moveToSecondPhase();
} catch (Exception e) {
invokeListener(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
}
}
} else {
@ -325,30 +322,6 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
}
protected void invokeListener(final SearchResponse response) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
} else {
listener.onResponse(response);
}
}
protected void invokeListener(final Throwable t) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(t);
}
});
} else {
listener.onFailure(t);
}
}
protected abstract void sendExecuteFirstPhase(DiscoveryNode node, InternalSearchRequest request, SearchServiceListener<FirstResult> listener);
protected abstract void processFirstPhaseResult(ShardRouting shard, FirstResult result);

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import static org.elasticsearch.action.support.PlainActionFuture.*;
@ -31,8 +32,11 @@ import static org.elasticsearch.action.support.PlainActionFuture.*;
*/
public abstract class BaseAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent implements Action<Request, Response> {
protected BaseAction(Settings settings) {
protected final ThreadPool threadPool;
protected BaseAction(Settings settings, ThreadPool threadPool) {
super(settings);
this.threadPool = threadPool;
}
@Override public ActionFuture<Response> execute(Request request) throws ElasticSearchException {
@ -45,6 +49,9 @@ public abstract class BaseAction<Request extends ActionRequest, Response extends
}
@Override public void execute(Request request, ActionListener<Response> listener) {
if (request.listenerThreaded()) {
listener = new ThreadedActionListener<Response>(threadPool, listener);
}
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
@ -58,4 +65,36 @@ public abstract class BaseAction<Request extends ActionRequest, Response extends
}
protected abstract void doExecute(Request request, ActionListener<Response> listener);
static class ThreadedActionListener<Response> implements ActionListener<Response> {
private final ThreadPool threadPool;
private final ActionListener<Response> listener;
ThreadedActionListener(ThreadPool threadPool, ActionListener<Response> listener) {
this.threadPool = threadPool;
this.listener = listener;
}
@Override public void onResponse(final Response response) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
}
}
});
}
@Override public void onFailure(final Throwable e) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(e);
}
});
}
}
}

View File

@ -54,7 +54,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
final String executor;
protected TransportBroadcastOperationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
super(settings);
super(settings, threadPool);
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
@ -179,7 +179,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
} else {
// really, no shards active in this group
onOperation(null, shardIt, null, false);
onOperation(null, shardIt, null);
}
}
// we have local operations, perform them now
@ -219,7 +219,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
final ShardRouting shard = nextShardOrNull(shardIt);
if (shard == null) {
// no more active shards... (we should not really get here, just safety)
onOperation(null, shardIt, null, false);
onOperation(null, shardIt, null);
} else {
final ShardRequest shardRequest = newShardRequest(shard, request);
if (shard.currentNodeId().equals(nodes.localNodeId())) {
@ -227,24 +227,24 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
threadPool.executor(executor).execute(new Runnable() {
@Override public void run() {
try {
onOperation(shard, shardOperation(shardRequest), true);
onOperation(shard, shardOperation(shardRequest));
} catch (Exception e) {
onOperation(shard, shardIt, e, true);
onOperation(shard, shardIt, e);
}
}
});
} else {
try {
onOperation(shard, shardOperation(shardRequest), false);
onOperation(shard, shardOperation(shardRequest));
} catch (Exception e) {
onOperation(shard, shardIt, e, false);
onOperation(shard, shardIt, e);
}
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
// no node connected, act as failure
onOperation(shard, shardIt, null, false);
onOperation(shard, shardIt, null);
} else {
transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
@Override public ShardResponse newInstance() {
@ -256,11 +256,11 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
@Override public void handleResponse(ShardResponse response) {
onOperation(shard, response, false);
onOperation(shard, response);
}
@Override public void handleException(TransportException e) {
onOperation(shard, shardIt, e, false);
onOperation(shard, shardIt, e);
}
});
}
@ -269,15 +269,15 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
@SuppressWarnings({"unchecked"})
private void onOperation(ShardRouting shard, ShardResponse response, boolean alreadyThreaded) {
private void onOperation(ShardRouting shard, ShardResponse response) {
shardsResponses.set(indexCounter.getAndIncrement(), response);
if (expectedOps == counterOps.incrementAndGet()) {
finishHim(alreadyThreaded);
finishHim();
}
}
@SuppressWarnings({"unchecked"})
private void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, Throwable t, boolean alreadyThreaded) {
private void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, Throwable t) {
if (!hasNextShard(shardIt)) {
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
@ -302,7 +302,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
shardsResponses.set(index, t);
}
if (expectedOps == counterOps.incrementAndGet()) {
finishHim(alreadyThreaded);
finishHim();
}
return;
} else {
@ -324,26 +324,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
performOperation(shardIt, true);
}
private void finishHim(boolean alreadyThreaded) {
// if we need to execute the listener on a thread, and we are not threaded already
// then do it
if (request.listenerThreaded() && !alreadyThreaded) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
try {
listener.onResponse(newResponse(request, shardsResponses, clusterState));
} catch (Exception e) {
listener.onFailure(e);
}
}
});
} else {
try {
listener.onResponse(newResponse(request, shardsResponses, clusterState));
} catch (Exception e) {
listener.onFailure(e);
}
}
private void finishHim() {
listener.onResponse(newResponse(request, shardsResponses, clusterState));
}
}

View File

@ -47,16 +47,13 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
protected final ClusterService clusterService;
protected final ThreadPool threadPool;
final String transportAction;
final String executor;
protected TransportMasterNodeOperationAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(settings);
super(settings, threadPool);
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.transportAction = transportAction();
this.executor = executor();

View File

@ -45,8 +45,6 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
protected final ClusterName clusterName;
protected final ThreadPool threadPool;
protected final ClusterService clusterService;
protected final TransportService transportService;
@ -57,9 +55,8 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
@Inject public TransportNodesOperationAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService) {
super(settings);
super(settings, threadPool);
this.clusterName = clusterName;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
@ -209,15 +206,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
}
private void finishHim() {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponse(request, responses));
}
});
} else {
listener.onResponse(newResponse(request, responses));
}
listener.onResponse(newResponse(request, responses));
}
}

View File

@ -43,16 +43,13 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
extends BaseAction<Request, Response> {
protected final ThreadPool threadPool;
protected final ClusterService clusterService;
protected final TransportShardReplicationOperationAction<ShardRequest, ShardResponse> shardAction;
@Inject public TransportIndexReplicationOperationAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TransportShardReplicationOperationAction<ShardRequest, ShardResponse> shardAction) {
super(settings);
this.threadPool = threadPool;
super(settings, threadPool);
this.clusterService = clusterService;
this.shardAction = shardAction;
@ -91,15 +88,7 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
@Override public void onResponse(ShardResponse result) {
shardsResponses.set(indexCounter.getAndIncrement(), result);
if (completionCounter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponseInstance(request, shardsResponses));
}
});
} else {
listener.onResponse(newResponseInstance(request, shardsResponses));
}
listener.onResponse(newResponseInstance(request, shardsResponses));
}
}
@ -109,15 +98,7 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
shardsResponses.set(index, e);
}
if (completionCounter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponseInstance(request, shardsResponses));
}
});
} else {
listener.onResponse(newResponseInstance(request, shardsResponses));
}
listener.onResponse(newResponseInstance(request, shardsResponses));
}
}
});

View File

@ -40,8 +40,6 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
extends BaseAction<Request, Response> {
protected final ThreadPool threadPool;
protected final ClusterService clusterService;
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction;
@ -51,8 +49,7 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
@Inject public TransportIndicesReplicationOperationAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction) {
super(settings);
this.threadPool = threadPool;
super(settings, threadPool);
this.clusterService = clusterService;
this.indexAction = indexAction;
@ -83,15 +80,7 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
@Override public void onResponse(IndexResponse result) {
indexResponses.set(indexCounter.getAndIncrement(), result);
if (completionCounter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponseInstance(request, indexResponses));
}
});
} else {
listener.onResponse(newResponseInstance(request, indexResponses));
}
listener.onResponse(newResponseInstance(request, indexResponses));
}
}
@ -102,15 +91,7 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
indexResponses.set(index, e);
}
if (completionCounter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(newResponseInstance(request, indexResponses));
}
});
} else {
listener.onResponse(newResponseInstance(request, indexResponses));
}
listener.onResponse(newResponseInstance(request, indexResponses));
}
}
});

View File

@ -70,8 +70,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected final IndicesService indicesService;
protected final ThreadPool threadPool;
protected final ShardStateAction shardStateAction;
protected final ReplicationType defaultReplicationType;
@ -86,11 +84,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected TransportShardReplicationOperationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction) {
super(settings);
super(settings, threadPool);
this.transportService = transportService;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.threadPool = threadPool;
this.shardStateAction = shardStateAction;
this.transportAction = transportAction();
@ -331,11 +328,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
request.beforeLocalFork();
threadPool.executor(executor).execute(new Runnable() {
@Override public void run() {
performOnPrimary(shard.id(), fromClusterEvent, true, shard, clusterState);
performOnPrimary(shard.id(), fromClusterEvent, shard, clusterState);
}
});
} else {
performOnPrimary(shard.id(), fromClusterEvent, false, shard, clusterState);
performOnPrimary(shard.id(), fromClusterEvent, shard, clusterState);
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
@ -372,15 +369,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// we should never get here, but here we go
if (!foundPrimary) {
final UnavailableShardsException failure = new UnavailableShardsException(shardIt.shardId(), request.toString());
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(failure);
}
});
} else {
listener.onFailure(failure);
}
listener.onFailure(failure);
}
return true;
}
@ -418,24 +407,16 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
clusterService.remove(this);
final UnavailableShardsException failure = new UnavailableShardsException(shardId, "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(failure);
}
});
} else {
listener.onFailure(failure);
}
listener.onFailure(failure);
}
});
}
}
private void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, boolean alreadyThreaded, final ShardRouting shard, ClusterState clusterState) {
private void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, final ShardRouting shard, ClusterState clusterState) {
try {
PrimaryResponse<Response> response = shardOperationOnPrimary(clusterState, new ShardOperationRequest(primaryShardId, request));
performReplicas(response, alreadyThreaded);
performReplicas(response);
} catch (Exception e) {
// shard has not been allocated yet, retry it here
if (e instanceof IndexShardMissingException || e instanceof IllegalIndexShardStateException || e instanceof IndexMissingException) {
@ -449,18 +430,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
private void performReplicas(final PrimaryResponse<Response> response, boolean alreadyThreaded) {
private void performReplicas(final PrimaryResponse<Response> response) {
if (ignoreReplicas() || shardIt.size() == 1 /* no replicas */) {
postPrimaryOperation(request, response);
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response.response());
} else {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
});
}
listener.onResponse(response.response());
return;
}
@ -492,30 +465,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (replicaCounter == 0) {
postPrimaryOperation(request, response);
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response.response());
} else {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
});
}
listener.onResponse(response.response());
return;
}
if (replicationType == ReplicationType.ASYNC) {
postPrimaryOperation(request, response);
// async replication, notify the listener
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response.response());
} else {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
});
}
listener.onResponse(response.response());
// now, trick the counter so it won't decrease to 0 and notify the listeners
replicaCounter = -100;
}
@ -544,10 +501,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it
if (!doOnlyOnRelocating) {
performOnReplica(response, alreadyThreaded, counter, shard, shard.currentNodeId());
performOnReplica(response, counter, shard, shard.currentNodeId());
}
if (shard.relocating()) {
performOnReplica(response, alreadyThreaded, counter, shard, shard.relocatingNodeId());
performOnReplica(response, counter, shard, shard.relocatingNodeId());
}
}
@ -555,32 +512,16 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
postPrimaryOperation(request, response);
// we also invoke here in case replicas finish before postPrimaryAction does
if (counter.decrementAndGet() == 0) {
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response.response());
} else {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
});
}
listener.onResponse(response.response());
}
}
private void performOnReplica(final PrimaryResponse<Response> response, boolean alreadyThreaded, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
private void performOnReplica(final PrimaryResponse<Response> response, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
// if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover
if (!nodes.nodeExists(nodeId)) {
if (counter.decrementAndGet() == 0) {
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response.response());
} else {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
});
}
listener.onResponse(response.response());
}
return;
}
@ -603,15 +544,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private void finishIfPossible() {
if (counter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
});
} else {
listener.onResponse(response.response());
}
listener.onResponse(response.response());
}
}
});
@ -643,15 +576,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
if (counter.decrementAndGet() == 0) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response.response());
}
});
} else {
listener.onResponse(response.response());
}
listener.onResponse(response.response());
}
}
}

View File

@ -48,17 +48,14 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
protected final TransportService transportService;
protected final ThreadPool threadPool;
final String transportAction;
final String transportShardAction;
final String executor;
protected TransportSingleCustomOperationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
super(settings);
super(settings, threadPool);
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
this.transportAction = transportAction();
this.transportShardAction = transportShardAction();
@ -146,15 +143,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
} else {
try {
final Response response = shardOperation(request, shard.id());
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
} else {
listener.onResponse(response);
}
listener.onResponse(response);
return;
} catch (Exception e) {
onFailure(shard, e);
@ -195,15 +184,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
} else {
try {
final Response response = shardOperation(request, shard.id());
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
} else {
listener.onResponse(response);
}
listener.onResponse(response);
return;
} catch (Exception e) {
onFailure(shard, e);
@ -222,15 +203,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
}
@Override public void handleResponse(final Response response) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
} else {
listener.onResponse(response);
}
listener.onResponse(response);
}
@Override public void handleException(TransportException exp) {
@ -249,16 +222,7 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
logger.debug("failed to execute [" + request + "]", failure);
}
}
if (request.listenerThreaded()) {
final Exception fFailure = failure;
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(fFailure);
}
});
} else {
listener.onFailure(failure);
}
listener.onFailure(failure);
}
}
}

View File

@ -48,17 +48,14 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
protected final TransportService transportService;
protected final ThreadPool threadPool;
final String transportAction;
final String transportShardAction;
final String executor;
protected TransportShardSingleOperationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
super(settings);
super(settings, threadPool);
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
this.transportAction = transportAction();
this.transportShardAction = transportShardAction();
@ -148,15 +145,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
} else {
try {
final Response response = shardOperation(request, shard.id());
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
} else {
listener.onResponse(response);
}
listener.onResponse(response);
return;
} catch (Exception e) {
onFailure(shard, e);
@ -188,15 +177,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
}
@Override public void handleResponse(final Response response) {
if (request.listenerThreaded()) {
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
} else {
listener.onResponse(response);
}
listener.onResponse(response);
}
@Override public void handleException(TransportException exp) {
@ -215,16 +196,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
logger.debug(shardIt.shardId() + ": Failed to get [" + request.type() + "#" + request.id() + "]", failure);
}
}
if (request.listenerThreaded()) {
final Exception fFailure = failure;
threadPool.cached().execute(new Runnable() {
@Override public void run() {
listener.onFailure(fFailure);
}
});
} else {
listener.onFailure(failure);
}
listener.onFailure(failure);
}
}
}