From 7fda12316a31f777e01453a43456f6005f94ad4e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 21 Aug 2013 23:57:15 +0200 Subject: [PATCH] Properly reduce in onFailure --- .../TransportMultiPercolateAction.java | 20 ++++++++++++++----- .../percolator/RecoveryPercolatorTests.java | 20 ++++++++++--------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java b/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java index 4876523b18a..aaadf603e46 100644 --- a/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java +++ b/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java @@ -136,6 +136,7 @@ public class TransportMultiPercolateAction extends TransportAction requestsByShard = new HashMap(); + final Map shardToSlots = new HashMap(); int expectedResults = 0; for (int i = 0; i < percolateRequests.size(); i++) { Object element = percolateRequests.get(i); @@ -158,6 +159,12 @@ public class TransportMultiPercolateAction extends TransportAction= 1; - if (expectedOperationsPerItem[item.slot()].decrementAndGet() == 0) { - reduce(item.slot(), percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard); + assert expectedOperationsPerItem[slot].get() >= 1; + if (expectedOperationsPerItem[slot].decrementAndGet() == 0) { + reduce(slot, percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard); } } } catch (Throwable t) { diff --git a/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java b/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java index 3cfb8f3fda3..61a3aa70d07 100644 --- a/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java +++ b/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java @@ -21,6 +21,8 @@ package org.elasticsearch.test.integration.percolator; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder; import org.elasticsearch.action.percolate.MultiPercolateResponse; @@ -266,17 +268,17 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { @Test public void testSinglePercolator_recovery() throws Exception { - multiPercolatorRecovery(false); + percolatorRecovery(false); } @Test public void testMultiPercolator_recovery() throws Exception { - multiPercolatorRecovery(true); + percolatorRecovery(true); } // 3 nodes, 2 primary + 2 replicas per primary, so each node should have a copy of the data. // We only start and stop nodes 2 and 3, so all requests should succeed and never be partial. - private void multiPercolatorRecovery(final boolean multiPercolate) throws Exception { + private void percolatorRecovery(final boolean multiPercolate) throws Exception { Settings settings = settingsBuilder() .put("gateway.type", "none").build(); logger.info("--> starting 3 nodes"); @@ -321,7 +323,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { } catch (IOException e) {} while (run.get()) { - /*NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo() + NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo() .execute().actionGet(); String node2Id = null; String node3Id = null; @@ -333,7 +335,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { } } - String preference = "_prefer_node:" + (randomBoolean() ? node2Id : node3Id);*/ + String preference = "_prefer_node:" + (randomBoolean() ? node2Id : node3Id); if (multiPercolate) { MultiPercolateRequestBuilder builder = client() .prepareMultiPercolate(); @@ -342,7 +344,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { for (int i = 0; i < numPercolateRequest / 2; i++) { builder.add( client().preparePercolate() -// .setPreference(preference) + .setPreference(preference) .setIndices("test").setDocumentType("type") .setPercolateDoc(docBuilder().setDoc(doc))); } @@ -350,7 +352,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { for (int i = numPercolateRequest / 2; i < numPercolateRequest; i++) { builder.add( client().preparePercolate() -// .setPreference(preference) + .setPreference(preference) .setGetRequest(Requests.getRequest("test").type("type").id("1")) .setIndices("test").setDocumentType("type") ); @@ -370,13 +372,13 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { response = client().preparePercolate() .setIndices("test").setDocumentType("type") .setPercolateDoc(docBuilder().setDoc(doc)) -// .setPreference(preference) + .setPreference(preference) .execute().actionGet(); } else { response = client().preparePercolate() .setGetRequest(Requests.getRequest("test").type("type").id("1")) .setIndices("test").setDocumentType("type") -// .setPreference(preference) + .setPreference(preference) .execute().actionGet(); } assertNoFailures(response);