From c3038889f9ef2d1f8a906b14b09ed816e96eb289 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 15 Jul 2013 22:24:43 +0200 Subject: [PATCH] Using AtomicArray to collect responses in mget and bulk indexing (instead of synchronised) --- .../action/bulk/TransportBulkAction.java | 42 +++++++++---------- .../action/get/TransportMultiGetAction.java | 23 +++++----- .../common/util/concurrent/AtomicArray.java | 18 ++++++++ .../stress/SingleThreadBulkStress.java | 7 +++- .../indexing/BulkIndexingStressTest.java | 16 ++++++- 5 files changed, 68 insertions(+), 38 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index ec46472774a..0bc87739b2d 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -42,6 +42,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.threadpool.ThreadPool; @@ -172,11 +173,12 @@ public class TransportBulkAction extends TransportAction responses = new AtomicArray(bulkRequest.requests.size()); // first, go over all the requests and create a ShardId -> Operations mapping Map> requestsByShard = Maps.newHashMap(); + for (int i = 0; i < bulkRequest.requests.size(); i++) { ActionRequest request = bulkRequest.requests.get(i); if (request instanceof IndexRequest) { @@ -228,7 +230,7 @@ public class TransportBulkAction extends TransportAction() { @Override public void onResponse(BulkShardResponse bulkShardResponse) { - synchronized (responses) { - for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { - responses[bulkItemResponse.getItemId()] = bulkItemResponse; - } + for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { + responses.set(bulkItemResponse.getItemId(), bulkItemResponse); } if (counter.decrementAndGet() == 0) { finishHim(); @@ -256,21 +256,19 @@ public class TransportBulkAction extends TransportAction responses = new AtomicArray(request.items.size()); Map shardRequests = new HashMap(); for (int i = 0; i < request.items.size(); i++) { MultiGetRequest.Item item = request.items.get(i); if (!clusterState.metaData().hasConcreteIndex(item.index())) { - responses[i] = new MultiGetItemResponse(null, new MultiGetResponse.Failure(item.index(), item.type(), item.id(), "[" + item.index() + "] missing")); + responses.set(i, new MultiGetItemResponse(null, new MultiGetResponse.Failure(item.index(), item.type(), item.id(), "[" + item.index() + "] missing"))); continue; } @@ -86,7 +87,7 @@ public class TransportMultiGetAction extends TransportAction() { @Override public void onResponse(MultiGetShardResponse response) { - synchronized (responses) { - for (int i = 0; i < response.locations.size(); i++) { - responses[response.locations.get(i)] = new MultiGetItemResponse(response.responses.get(i), response.failures.get(i)); - } + for (int i = 0; i < response.locations.size(); i++) { + responses.set(response.locations.get(i), new MultiGetItemResponse(response.responses.get(i), response.failures.get(i))); } if (counter.decrementAndGet() == 0) { finishHim(); @@ -109,11 +108,9 @@ public class TransportMultiGetAction extends TransportAction { return nonNullList; } + /** + * Copies the content of the underlying atomic array to a normal one. If the supplied array is too small a new one will be allocated. + * If the supplied array's length is longer than needed, the element in the array immediately following the end of the collection is set to + * null. All in similar fashion to {@link ArrayList#toArray} + */ + public E[] toArray(E[] a) { + if (a.length < array.length()) { + a = (E[]) Array.newInstance(a.getClass().getComponentType(), array.length()); + } else if (a.length > array.length()) { + a[array.length()] = null; + } + for (int i = 0; i < array.length(); i++) { + a[i] = array.get(i); + } + return a; + } + /** * An entry within the array. */ diff --git a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java index 4bf597c53be..49d5a489ab8 100644 --- a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java +++ b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java @@ -48,13 +48,16 @@ public class SingleThreadBulkStress { public static void main(String[] args) throws Exception { Random random = new Random(); + int shardsCount = Integer.parseInt(System.getProperty("es.shards", "1")); + int replicaCount = Integer.parseInt(System.getProperty("es.replica", "1")); + Settings settings = settingsBuilder() .put("index.refresh_interval", "1s") .put("index.merge.async", true) .put("index.translog.flush_threshold_ops", 5000) .put("gateway.type", "none") - .put(SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(SETTING_NUMBER_OF_SHARDS, shardsCount) + .put(SETTING_NUMBER_OF_REPLICAS, replicaCount) .build(); Node[] nodes = new Node[1]; diff --git a/src/test/java/org/elasticsearch/test/stress/indexing/BulkIndexingStressTest.java b/src/test/java/org/elasticsearch/test/stress/indexing/BulkIndexingStressTest.java index 3bedcad4a71..f40979d4251 100644 --- a/src/test/java/org/elasticsearch/test/stress/indexing/BulkIndexingStressTest.java +++ b/src/test/java/org/elasticsearch/test/stress/indexing/BulkIndexingStressTest.java @@ -20,7 +20,9 @@ package org.elasticsearch.test.stress.indexing; import jsr166y.ThreadLocalRandom; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.common.settings.ImmutableSettings; @@ -39,6 +41,8 @@ public class BulkIndexingStressTest { final Settings nodeSettings = ImmutableSettings.settingsBuilder().put("index.number_of_shards", 2).build(); +// ESLogger logger = Loggers.getLogger("org.elasticsearch"); +// logger.setLevel("DEBUG"); Node[] nodes = new Node[NUMBER_OF_NODES]; for (int i = 0; i < nodes.length; i++) { nodes[i] = NodeBuilder.nodeBuilder().settings(nodeSettings).node(); @@ -51,7 +55,17 @@ public class BulkIndexingStressTest { for (int i = 0; i < BATCH; i++) { bulkRequest.add(Requests.indexRequest("test" + ThreadLocalRandom.current().nextInt(NUMBER_OF_INDICES)).type("type").source("field", "value")); } - bulkRequest.execute().actionGet(); + BulkResponse bulkResponse = bulkRequest.execute().actionGet(); + if (bulkResponse.hasFailures()) { + for (BulkItemResponse item : bulkResponse) { + if (item.isFailed()) { + System.out.println("failed response:" + item.getFailureMessage()); + } + } + + throw new RuntimeException("Failed responses"); + } + ; } } }