Using AtomicArray to collect responses in mget and bulk indexing (instead of synchronised)

This commit is contained in:
Boaz Leskes 2013-07-15 22:24:43 +02:00
parent 28b9e25053
commit c3038889f9
5 changed files with 68 additions and 38 deletions

View File

@ -42,6 +42,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -172,11 +173,12 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
updateRequest.index(clusterState.metaData().concreteIndex(updateRequest.index())); updateRequest.index(clusterState.metaData().concreteIndex(updateRequest.index()));
} }
} }
final BulkItemResponse[] responses = new BulkItemResponse[bulkRequest.requests.size()]; final AtomicArray<BulkItemResponse> responses = new AtomicArray<BulkItemResponse>(bulkRequest.requests.size());
// first, go over all the requests and create a ShardId -> Operations mapping // first, go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = Maps.newHashMap(); Map<ShardId, List<BulkItemRequest>> requestsByShard = Maps.newHashMap();
for (int i = 0; i < bulkRequest.requests.size(); i++) { for (int i = 0; i < bulkRequest.requests.size(); i++) {
ActionRequest request = bulkRequest.requests.get(i); ActionRequest request = bulkRequest.requests.get(i);
if (request instanceof IndexRequest) { if (request instanceof IndexRequest) {
@ -228,7 +230,7 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
} }
if (requestsByShard.isEmpty()) { if (requestsByShard.isEmpty()) {
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime)); listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), System.currentTimeMillis() - startTime));
return; return;
} }
@ -242,10 +244,8 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() { shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
@Override @Override
public void onResponse(BulkShardResponse bulkShardResponse) { public void onResponse(BulkShardResponse bulkShardResponse) {
synchronized (responses) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
responses[bulkItemResponse.getItemId()] = bulkItemResponse; responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim();
@ -256,21 +256,19 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
public void onFailure(Throwable e) { public void onFailure(Throwable e) {
// create failures for all relevant requests // create failures for all relevant requests
String message = ExceptionsHelper.detailedMessage(e); String message = ExceptionsHelper.detailedMessage(e);
synchronized (responses) {
for (BulkItemRequest request : requests) { for (BulkItemRequest request : requests) {
if (request.request() instanceof IndexRequest) { if (request.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request.request(); IndexRequest indexRequest = (IndexRequest) request.request();
responses[request.id()] = new BulkItemResponse(request.id(), indexRequest.opType().toString().toLowerCase(Locale.ENGLISH), responses.set(request.id(), new BulkItemResponse(request.id(), indexRequest.opType().toString().toLowerCase(Locale.ENGLISH),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), message)); new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), message)));
} else if (request.request() instanceof DeleteRequest) { } else if (request.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request.request(); DeleteRequest deleteRequest = (DeleteRequest) request.request();
responses[request.id()] = new BulkItemResponse(request.id(), "delete", responses.set(request.id(), new BulkItemResponse(request.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), message)); new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), message)));
} else if (request.request() instanceof UpdateRequest) { } else if (request.request() instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request.request(); UpdateRequest updateRequest = (UpdateRequest) request.request();
responses[request.id()] = new BulkItemResponse(request.id(), "update", responses.set(request.id(), new BulkItemResponse(request.id(), "update",
new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), message)); new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), message)));
}
} }
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
@ -279,7 +277,7 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
} }
private void finishHim() { private void finishHim() {
listener.onResponse(new BulkResponse(responses, System.currentTimeMillis() - startTime)); listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), System.currentTimeMillis() - startTime));
} }
}); });
} }

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler; import org.elasticsearch.transport.BaseTransportRequestHandler;
@ -58,13 +59,13 @@ public class TransportMultiGetAction extends TransportAction<MultiGetRequest, Mu
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
final MultiGetItemResponse[] responses = new MultiGetItemResponse[request.items.size()]; final AtomicArray<MultiGetItemResponse> responses = new AtomicArray<MultiGetItemResponse>(request.items.size());
Map<ShardId, MultiGetShardRequest> shardRequests = new HashMap<ShardId, MultiGetShardRequest>(); Map<ShardId, MultiGetShardRequest> shardRequests = new HashMap<ShardId, MultiGetShardRequest>();
for (int i = 0; i < request.items.size(); i++) { for (int i = 0; i < request.items.size(); i++) {
MultiGetRequest.Item item = request.items.get(i); MultiGetRequest.Item item = request.items.get(i);
if (!clusterState.metaData().hasConcreteIndex(item.index())) { if (!clusterState.metaData().hasConcreteIndex(item.index())) {
responses[i] = new MultiGetItemResponse(null, new MultiGetResponse.Failure(item.index(), item.type(), item.id(), "[" + item.index() + "] missing")); responses.set(i, new MultiGetItemResponse(null, new MultiGetResponse.Failure(item.index(), item.type(), item.id(), "[" + item.index() + "] missing")));
continue; continue;
} }
@ -86,7 +87,7 @@ public class TransportMultiGetAction extends TransportAction<MultiGetRequest, Mu
if (shardRequests.size() == 0) { if (shardRequests.size() == 0) {
// only failures.. // only failures..
listener.onResponse(new MultiGetResponse(responses)); listener.onResponse(new MultiGetResponse(responses.toArray(new MultiGetItemResponse[responses.length()])));
} }
final AtomicInteger counter = new AtomicInteger(shardRequests.size()); final AtomicInteger counter = new AtomicInteger(shardRequests.size());
@ -95,10 +96,8 @@ public class TransportMultiGetAction extends TransportAction<MultiGetRequest, Mu
shardAction.execute(shardRequest, new ActionListener<MultiGetShardResponse>() { shardAction.execute(shardRequest, new ActionListener<MultiGetShardResponse>() {
@Override @Override
public void onResponse(MultiGetShardResponse response) { public void onResponse(MultiGetShardResponse response) {
synchronized (responses) {
for (int i = 0; i < response.locations.size(); i++) { for (int i = 0; i < response.locations.size(); i++) {
responses[response.locations.get(i)] = new MultiGetItemResponse(response.responses.get(i), response.failures.get(i)); responses.set(response.locations.get(i), new MultiGetItemResponse(response.responses.get(i), response.failures.get(i)));
}
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim();
@ -109,11 +108,9 @@ public class TransportMultiGetAction extends TransportAction<MultiGetRequest, Mu
public void onFailure(Throwable e) { public void onFailure(Throwable e) {
// create failures for all relevant requests // create failures for all relevant requests
String message = ExceptionsHelper.detailedMessage(e); String message = ExceptionsHelper.detailedMessage(e);
synchronized (responses) {
for (int i = 0; i < shardRequest.locations.size(); i++) { for (int i = 0; i < shardRequest.locations.size(); i++) {
responses[shardRequest.locations.get(i)] = new MultiGetItemResponse(null, responses.set(shardRequest.locations.get(i), new MultiGetItemResponse(null,
new MultiGetResponse.Failure(shardRequest.index(), shardRequest.types.get(i), shardRequest.ids.get(i), message)); new MultiGetResponse.Failure(shardRequest.index(), shardRequest.types.get(i), shardRequest.ids.get(i), message)));
}
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim();
@ -121,7 +118,7 @@ public class TransportMultiGetAction extends TransportAction<MultiGetRequest, Mu
} }
private void finishHim() { private void finishHim() {
listener.onResponse(new MultiGetResponse(responses)); listener.onResponse(new MultiGetResponse(responses.toArray(new MultiGetItemResponse[responses.length()])));
} }
}); });
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.util.concurrent;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import java.lang.reflect.Array;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
@ -98,6 +99,23 @@ public class AtomicArray<E> {
return nonNullList; return nonNullList;
} }
/**
* Copies the content of the underlying atomic array to a normal one. If the supplied array is too small a new one will be allocated.
* If the supplied array's length is longer than needed, the element in the array immediately following the end of the collection is set to
* <tt>null</tt>. All in similar fashion to {@link ArrayList#toArray}
*/
public E[] toArray(E[] a) {
if (a.length < array.length()) {
a = (E[]) Array.newInstance(a.getClass().getComponentType(), array.length());
} else if (a.length > array.length()) {
a[array.length()] = null;
}
for (int i = 0; i < array.length(); i++) {
a[i] = array.get(i);
}
return a;
}
/** /**
* An entry within the array. * An entry within the array.
*/ */

View File

@ -48,13 +48,16 @@ public class SingleThreadBulkStress {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Random random = new Random(); Random random = new Random();
int shardsCount = Integer.parseInt(System.getProperty("es.shards", "1"));
int replicaCount = Integer.parseInt(System.getProperty("es.replica", "1"));
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("index.refresh_interval", "1s") .put("index.refresh_interval", "1s")
.put("index.merge.async", true) .put("index.merge.async", true)
.put("index.translog.flush_threshold_ops", 5000) .put("index.translog.flush_threshold_ops", 5000)
.put("gateway.type", "none") .put("gateway.type", "none")
.put(SETTING_NUMBER_OF_SHARDS, 1) .put(SETTING_NUMBER_OF_SHARDS, shardsCount)
.put(SETTING_NUMBER_OF_REPLICAS, 1) .put(SETTING_NUMBER_OF_REPLICAS, replicaCount)
.build(); .build();
Node[] nodes = new Node[1]; Node[] nodes = new Node[1];

View File

@ -20,7 +20,9 @@
package org.elasticsearch.test.stress.indexing; package org.elasticsearch.test.stress.indexing;
import jsr166y.ThreadLocalRandom; import jsr166y.ThreadLocalRandom;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
@ -39,6 +41,8 @@ public class BulkIndexingStressTest {
final Settings nodeSettings = ImmutableSettings.settingsBuilder().put("index.number_of_shards", 2).build(); final Settings nodeSettings = ImmutableSettings.settingsBuilder().put("index.number_of_shards", 2).build();
// ESLogger logger = Loggers.getLogger("org.elasticsearch");
// logger.setLevel("DEBUG");
Node[] nodes = new Node[NUMBER_OF_NODES]; Node[] nodes = new Node[NUMBER_OF_NODES];
for (int i = 0; i < nodes.length; i++) { for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeBuilder.nodeBuilder().settings(nodeSettings).node(); nodes[i] = NodeBuilder.nodeBuilder().settings(nodeSettings).node();
@ -51,7 +55,17 @@ public class BulkIndexingStressTest {
for (int i = 0; i < BATCH; i++) { for (int i = 0; i < BATCH; i++) {
bulkRequest.add(Requests.indexRequest("test" + ThreadLocalRandom.current().nextInt(NUMBER_OF_INDICES)).type("type").source("field", "value")); bulkRequest.add(Requests.indexRequest("test" + ThreadLocalRandom.current().nextInt(NUMBER_OF_INDICES)).type("type").source("field", "value"));
} }
bulkRequest.execute().actionGet(); BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
for (BulkItemResponse item : bulkResponse) {
if (item.isFailed()) {
System.out.println("failed response:" + item.getFailureMessage());
}
}
throw new RuntimeException("Failed responses");
}
;
} }
} }
} }