Properly reduce in onFailure

This commit is contained in:
Martijn van Groningen 2013-08-21 23:57:15 +02:00
parent 352d2aaf18
commit 7fda12316a
2 changed files with 26 additions and 14 deletions

View File

@ -136,6 +136,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
// Resolving concrete indices and routing and grouping the requests by shard
final Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard = new HashMap<ShardId, TransportShardMultiPercolateAction.Request>();
final Map<ShardId, TIntArrayList> shardToSlots = new HashMap<ShardId, TIntArrayList>();
int expectedResults = 0;
for (int i = 0; i < percolateRequests.size(); i++) {
Object element = percolateRequests.get(i);
@ -158,6 +159,12 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shard.shardId().getIndex(), shardId.id(), percolateRequest.preference()));
}
requests.add(new TransportShardMultiPercolateAction.Request.Item(i, new PercolateShardRequest(shardId, percolateRequest)));
TIntArrayList items = shardToSlots.get(shardId);
if (items == null) {
shardToSlots.put(shardId, items = new TIntArrayList());
}
items.add(i);
}
expectedResults++;
} else if (element instanceof Throwable) {
@ -200,6 +207,7 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
}
}
} catch (Throwable e) {
logger.error("{} Percolate original reduce error", e, shardId);
listener.onFailure(e);
}
}
@ -208,16 +216,18 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
@SuppressWarnings("unchecked")
public void onFailure(Throwable e) {
try {
for (TransportShardMultiPercolateAction.Request.Item item : shardRequest.items()) {
AtomicReferenceArray shardResults = responsesByItemAndShard.get(item.slot());
TIntArrayList slots = shardToSlots.get(shardId);
for (int i = 0; i < slots.size(); i++) {
int slot = slots.get(i);
AtomicReferenceArray shardResults = responsesByItemAndShard.get(slot);
if (shardResults == null) {
continue;
}
shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, e));
assert expectedOperationsPerItem[item.slot()].get() >= 1;
if (expectedOperationsPerItem[item.slot()].decrementAndGet() == 0) {
reduce(item.slot(), percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard);
assert expectedOperationsPerItem[slot].get() >= 1;
if (expectedOperationsPerItem[slot].decrementAndGet() == 0) {
reduce(slot, percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard);
}
}
} catch (Throwable t) {

View File

@ -21,6 +21,8 @@ package org.elasticsearch.test.integration.percolator;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder;
import org.elasticsearch.action.percolate.MultiPercolateResponse;
@ -266,17 +268,17 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
@Test
public void testSinglePercolator_recovery() throws Exception {
multiPercolatorRecovery(false);
percolatorRecovery(false);
}
@Test
public void testMultiPercolator_recovery() throws Exception {
multiPercolatorRecovery(true);
percolatorRecovery(true);
}
// 3 nodes, 2 primary + 2 replicas per primary, so each node should have a copy of the data.
// We only start and stop nodes 2 and 3, so all requests should succeed and never be partial.
private void multiPercolatorRecovery(final boolean multiPercolate) throws Exception {
private void percolatorRecovery(final boolean multiPercolate) throws Exception {
Settings settings = settingsBuilder()
.put("gateway.type", "none").build();
logger.info("--> starting 3 nodes");
@ -321,7 +323,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
} catch (IOException e) {}
while (run.get()) {
/*NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo()
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo()
.execute().actionGet();
String node2Id = null;
String node3Id = null;
@ -333,7 +335,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
}
}
String preference = "_prefer_node:" + (randomBoolean() ? node2Id : node3Id);*/
String preference = "_prefer_node:" + (randomBoolean() ? node2Id : node3Id);
if (multiPercolate) {
MultiPercolateRequestBuilder builder = client()
.prepareMultiPercolate();
@ -342,7 +344,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
for (int i = 0; i < numPercolateRequest / 2; i++) {
builder.add(
client().preparePercolate()
// .setPreference(preference)
.setPreference(preference)
.setIndices("test").setDocumentType("type")
.setPercolateDoc(docBuilder().setDoc(doc)));
}
@ -350,7 +352,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
for (int i = numPercolateRequest / 2; i < numPercolateRequest; i++) {
builder.add(
client().preparePercolate()
// .setPreference(preference)
.setPreference(preference)
.setGetRequest(Requests.getRequest("test").type("type").id("1"))
.setIndices("test").setDocumentType("type")
);
@ -370,13 +372,13 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
response = client().preparePercolate()
.setIndices("test").setDocumentType("type")
.setPercolateDoc(docBuilder().setDoc(doc))
// .setPreference(preference)
.setPreference(preference)
.execute().actionGet();
} else {
response = client().preparePercolate()
.setGetRequest(Requests.getRequest("test").type("type").id("1"))
.setIndices("test").setDocumentType("type")
// .setPreference(preference)
.setPreference(preference)
.execute().actionGet();
}
assertNoFailures(response);