move msearch to use atomic array instead of synchronize

This commit is contained in:
Shay Banon 2014-01-27 13:22:49 +01:00
parent 24abb6cf3f
commit c1c2d343c4
1 changed files with 7 additions and 10 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
@ -56,16 +57,14 @@ public class TransportMultiSearchAction extends TransportAction<MultiSearchReque
ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
final MultiSearchResponse.Item[] responses = new MultiSearchResponse.Item[request.requests().size()];
final AtomicInteger counter = new AtomicInteger(responses.length);
for (int i = 0; i < responses.length; i++) {
final AtomicArray<MultiSearchResponse.Item> responses = new AtomicArray<MultiSearchResponse.Item>(request.requests().size());
final AtomicInteger counter = new AtomicInteger(responses.length());
for (int i = 0; i < responses.length(); i++) {
final int index = i;
searchAction.execute(request.requests().get(i), new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
synchronized (responses) {
responses[index] = new MultiSearchResponse.Item(searchResponse, null);
}
responses.set(index, new MultiSearchResponse.Item(searchResponse, null));
if (counter.decrementAndGet() == 0) {
finishHim();
}
@ -73,16 +72,14 @@ public class TransportMultiSearchAction extends TransportAction<MultiSearchReque
@Override
public void onFailure(Throwable e) {
synchronized (responses) {
responses[index] = new MultiSearchResponse.Item(null, ExceptionsHelper.detailedMessage(e));
}
responses.set(index, new MultiSearchResponse.Item(null, ExceptionsHelper.detailedMessage(e)));
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
listener.onResponse(new MultiSearchResponse(responses));
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()])));
}
});
}