diff --git a/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java b/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java index 48a8e768830..5ed98a5024d 100644 --- a/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java +++ b/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java @@ -33,7 +33,6 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.percolator.PercolatorService; @@ -43,9 +42,7 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportService; import java.util.*; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicReferenceArray; /** @@ -76,22 +73,20 @@ public class TransportMultiPercolateAction extends TransportAction percolateRequests = new AtomicReferenceArray(request.requests().size()); - TIntArrayList getRequestSlots = new TIntArrayList(); - List existingDocsRequests = new ArrayList(); - for (int i = 0; i < request.requests().size(); i++) { - PercolateRequest percolateRequest = request.requests().get(i); - percolateRequest.startTime = System.currentTimeMillis(); - percolateRequests.set(i, percolateRequest); - if (percolateRequest.getRequest() != null) { - existingDocsRequests.add(percolateRequest.getRequest()); - getRequestSlots.add(i); - } - } - + final List percolateRequests = new ArrayList(request.requests().size()); // Can have a mixture of percolate requests. (normal percolate requests & percolate existing doc), // so we need to keep track for what percolate request we had a get request - final AtomicIntegerArray percolateRequestSlotsWithGet = new AtomicIntegerArray(getRequestSlots.toArray()); + final TIntArrayList getRequestSlots = new TIntArrayList(); + List existingDocsRequests = new ArrayList(); + for (int slot = 0; slot < request.requests().size(); slot++) { + PercolateRequest percolateRequest = request.requests().get(slot); + percolateRequest.startTime = System.currentTimeMillis(); + percolateRequests.add(percolateRequest); + if (percolateRequest.getRequest() != null) { + existingDocsRequests.add(percolateRequest.getRequest()); + getRequestSlots.add(slot); + } + } if (!existingDocsRequests.isEmpty()) { final MultiGetRequest multiGetRequest = new MultiGetRequest(); @@ -108,16 +103,18 @@ public class TransportMultiPercolateAction extends TransportAction listener; + final ActionListener finalListener; final Map requestsByShard; - final AtomicReferenceArray percolateRequests; + final List percolateRequests; + final Map shardToSlots; final AtomicInteger expectedOperations; final AtomicArray reducedResponses; - final ConcurrentMap shardToSlots; final AtomicReferenceArray expectedOperationsPerItem; final AtomicReferenceArray responsesByItemAndShard; - ASyncAction(AtomicReferenceArray percolateRequests, ActionListener listener, ClusterState clusterState) { - this.listener = listener; + ASyncAction(List percolateRequests, ActionListener finalListener, ClusterState clusterState) { + this.finalListener = finalListener; this.percolateRequests = percolateRequests; - responsesByItemAndShard = new AtomicReferenceArray(percolateRequests.length()); - expectedOperationsPerItem = new AtomicReferenceArray(percolateRequests.length()); - reducedResponses = new AtomicArray(percolateRequests.length()); + responsesByItemAndShard = new AtomicReferenceArray(percolateRequests.size()); + expectedOperationsPerItem = new AtomicReferenceArray(percolateRequests.size()); + reducedResponses = new AtomicArray(percolateRequests.size()); // Resolving concrete indices and routing and grouping the requests by shard requestsByShard = new HashMap(); // Keep track what slots belong to what shard, in case a request to a shard fails on all copies - Map shardToSlotsBuilder = new HashMap(); + shardToSlots = new HashMap(); int expectedResults = 0; - for (int slot = 0; slot < percolateRequests.length(); slot++) { + for (int slot = 0; slot < percolateRequests.size(); slot++) { Object element = percolateRequests.get(slot); assert element != null; if (element instanceof PercolateRequest) { @@ -169,7 +166,7 @@ public class TransportMultiPercolateAction extends TransportAction> routing = clusterState.metaData().resolveSearchRouting(percolateRequest.routing(), percolateRequest.indices()); // TODO: I only need shardIds, ShardIterator(ShardRouting) is only needed in TransportShardMultiPercolateAction GroupShardsIterator shards = clusterService.operationRouting().searchShards( - clusterState, percolateRequest.indices(), concreteIndices, routing, percolateRequest.preference() + clusterState, percolateRequest.indices(), concreteIndices, routing, null ); responsesByItemAndShard.set(slot, new AtomicReferenceArray(shards.size())); @@ -180,27 +177,24 @@ public class TransportMultiPercolateAction extends TransportAction entry : shardToSlotsBuilder.entrySet()) { - shardToSlots.put(entry.getKey(), new AtomicIntegerArray(entry.getValue().toArray())); - } } void run() { @@ -209,21 +203,20 @@ public class TransportMultiPercolateAction extends TransportAction entry : requestsByShard.entrySet()) { - final int shardId = entry.getKey().id(); - final String index = entry.getKey().index().name(); - + final ShardId shardId = entry.getKey(); TransportShardMultiPercolateAction.Request shardRequest = entry.getValue(); shardMultiPercolateAction.execute(shardRequest, new ActionListener() { @Override public void onResponse(TransportShardMultiPercolateAction.Response response) { - onShardResponse(new ShardId(index, shardId), response); + onShardResponse(shardId, response); } @Override public void onFailure(Throwable e) { - onShardFailure(new ShardId(index, shardId), e); + onShardFailure(shardId, e); } }); @@ -232,6 +225,7 @@ public class TransportMultiPercolateAction extends TransportAction items; - private volatile boolean done = false; - public Request() { } diff --git a/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java b/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java index 0b990650d0d..6479a63a6e9 100644 --- a/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java +++ b/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.junit.annotations.TestLogging; import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.test.integration.AbstractNodesTests; import org.junit.After; @@ -272,6 +273,8 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { } @Test + // Need to omit org.elast + @TestLogging("action.percolate:TRACE") public void testMultiPercolator_recovery() throws Exception { percolatorRecovery(true); }