From a7b2b7847a3798637e5b54e930612ed25b3a8804 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 30 Aug 2013 00:06:45 +0200 Subject: [PATCH] Use atomic collections to make sure all of the memory contents are visible from writing to reading thread. --- .../TransportMultiPercolateAction.java | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java b/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java index acb54f0117d..c5410e08446 100644 --- a/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java +++ b/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.percolator.PercolatorService; @@ -42,7 +43,9 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportService; import java.util.*; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicReferenceArray; /** @@ -72,20 +75,24 @@ public class TransportMultiPercolateAction extends TransportAction listener) { final ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); - @SuppressWarnings("unchecked") - final List percolateRequests = (List) request.requests(); - final TIntArrayList slots = new TIntArrayList(); - final List existingDocsRequests = new ArrayList(); + final AtomicReferenceArray percolateRequests = new AtomicReferenceArray(request.requests().size()); + TIntArrayList getRequestSlots = new TIntArrayList(); + List existingDocsRequests = new ArrayList(); for (int i = 0; i < request.requests().size(); i++) { PercolateRequest percolateRequest = request.requests().get(i); percolateRequest.startTime = System.currentTimeMillis(); + percolateRequests.set(i, percolateRequest); if (percolateRequest.getRequest() != null) { existingDocsRequests.add(percolateRequest.getRequest()); - slots.add(i); + getRequestSlots.add(i); } } + // Can have a mixture of percolate requests. (normal percolate requests & percolate existing doc), + // so we need to keep track for what percolate request we had a get request + final AtomicIntegerArray percolateRequestSlotsWithGet = new AtomicIntegerArray(getRequestSlots.toArray()); + if (!existingDocsRequests.isEmpty()) { final MultiGetRequest multiGetRequest = new MultiGetRequest(); for (GetRequest getRequest : existingDocsRequests) { @@ -101,7 +108,7 @@ public class TransportMultiPercolateAction extends TransportAction percolateRequests, + private void multiPercolate(MultiPercolateRequest multiPercolateRequest, final AtomicReferenceArray percolateRequests, final ActionListener listener, ClusterState clusterState) { - final AtomicInteger[] expectedOperationsPerItem = new AtomicInteger[percolateRequests.size()]; + final AtomicInteger[] expectedOperationsPerItem = new AtomicInteger[percolateRequests.length()]; final AtomicReferenceArray responsesByItemAndShard = new AtomicReferenceArray(multiPercolateRequest.requests().size()); - final AtomicArray reducedResponses = new AtomicArray(percolateRequests.size()); + final AtomicArray reducedResponses = new AtomicArray(percolateRequests.length()); + + // Keep track what slots belong to what shard, in case a request to a shard fails on all copies + final ConcurrentMap shardToSlots = ConcurrentCollections.newConcurrentMap(); // Resolving concrete indices and routing and grouping the requests by shard - final Map requestsByShard = new HashMap(); - final Map shardToSlots = new HashMap(); + Map requestsByShard = new HashMap(); + Map shardToSlotsBuilder = new HashMap(); int expectedResults = 0; - for (int i = 0; i < percolateRequests.size(); i++) { + for (int i = 0; i < percolateRequests.length(); i++) { Object element = percolateRequests.get(i); assert element != null; if (element instanceof PercolateRequest) { @@ -160,9 +170,9 @@ public class TransportMultiPercolateAction extends TransportAction entry : shardToSlotsBuilder.entrySet()) { + shardToSlots.put(entry.getKey(), new AtomicIntegerArray(entry.getValue().toArray())); + } + final AtomicInteger expectedOperations = new AtomicInteger(expectedResults); for (Map.Entry entry : requestsByShard.entrySet()) { final ShardId shardId = entry.getKey(); @@ -219,8 +233,8 @@ public class TransportMultiPercolateAction extends TransportAction percolateRequests, + AtomicReferenceArray percolateRequests, AtomicInteger expectedOperations, AtomicArray reducedResponses, ActionListener listener, @@ -253,7 +267,7 @@ public class TransportMultiPercolateAction extends TransportAction= 1; + assert expectedOperations.get() >= 1 : "slot[" + slot + "] expected options should be >= 1 but is " + expectedOperations.get(); if (expectedOperations.decrementAndGet() == 0) { finish(reducedResponses, listener); } @@ -263,7 +277,7 @@ public class TransportMultiPercolateAction extends TransportAction