diff --git a/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java b/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java index 8ea498b5af4..48a8e768830 100644 --- a/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java +++ b/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java @@ -112,7 +112,8 @@ public class TransportMultiPercolateAction extends TransportAction percolateRequests, - final ActionListener listener, ClusterState clusterState) { - final AtomicReferenceArray expectedOperationsPerItem = new AtomicReferenceArray(percolateRequests.length()); - final AtomicReferenceArray responsesByItemAndShard = new AtomicReferenceArray(multiPercolateRequest.requests().size()); - final AtomicArray reducedResponses = new AtomicArray(percolateRequests.length()); + private class ASyncAction { - // Resolving concrete indices and routing and grouping the requests by shard - Map requestsByShard = new HashMap(); - // Keep track what slots belong to what shard, in case a request to a shard fails on all copies - Map shardToSlotsBuilder = new HashMap(); - int expectedResults = 0; - for (int i = 0; i < percolateRequests.length(); i++) { - Object element = percolateRequests.get(i); - assert element != null; - if (element instanceof PercolateRequest) { - PercolateRequest percolateRequest = (PercolateRequest) element; - String[] concreteIndices = clusterState.metaData().concreteIndices(percolateRequest.indices(), percolateRequest.ignoreIndices(), true); - Map> routing = clusterState.metaData().resolveSearchRouting(percolateRequest.routing(), multiPercolateRequest.indices()); - // TODO: I only need shardIds, ShardIterator(ShardRouting) is only needed in TransportShardMultiPercolateAction - GroupShardsIterator shards = clusterService.operationRouting().searchShards( - clusterState, percolateRequest.indices(), concreteIndices, routing, percolateRequest.preference() - ); + final ActionListener listener; + final Map requestsByShard; + final AtomicReferenceArray percolateRequests; - responsesByItemAndShard.set(i, new AtomicReferenceArray(shards.size())); - expectedOperationsPerItem.set(i, new AtomicInteger(shards.size())); - for (ShardIterator shard : shards) { - ShardId shardId = shard.shardId(); - TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId); - if (requests == null) { - requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shard.shardId().getIndex(), shardId.id(), percolateRequest.preference())); + final AtomicInteger expectedOperations; + final AtomicArray reducedResponses; + final ConcurrentMap shardToSlots; + final AtomicReferenceArray expectedOperationsPerItem; + final AtomicReferenceArray responsesByItemAndShard; + + ASyncAction(AtomicReferenceArray percolateRequests, ActionListener listener, ClusterState clusterState) { + this.listener = listener; + this.percolateRequests = percolateRequests; + responsesByItemAndShard = new AtomicReferenceArray(percolateRequests.length()); + expectedOperationsPerItem = new AtomicReferenceArray(percolateRequests.length()); + reducedResponses = new AtomicArray(percolateRequests.length()); + + // Resolving concrete indices and routing and grouping the requests by shard + requestsByShard = new HashMap(); + // Keep track what slots belong to what shard, in case a request to a shard fails on all copies + Map shardToSlotsBuilder = new HashMap(); + int expectedResults = 0; + for (int slot = 0; slot < percolateRequests.length(); slot++) { + Object element = percolateRequests.get(slot); + assert element != null; + if (element instanceof PercolateRequest) { + PercolateRequest percolateRequest = (PercolateRequest) element; + String[] concreteIndices = clusterState.metaData().concreteIndices(percolateRequest.indices(), percolateRequest.ignoreIndices(), true); + Map> routing = clusterState.metaData().resolveSearchRouting(percolateRequest.routing(), percolateRequest.indices()); + // TODO: I only need shardIds, ShardIterator(ShardRouting) is only needed in TransportShardMultiPercolateAction + GroupShardsIterator shards = clusterService.operationRouting().searchShards( + clusterState, percolateRequest.indices(), concreteIndices, routing, percolateRequest.preference() + ); + + responsesByItemAndShard.set(slot, new AtomicReferenceArray(shards.size())); + expectedOperationsPerItem.set(slot, new AtomicInteger(shards.size())); + for (ShardIterator shard : shards) { + ShardId shardId = shard.shardId(); + TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId); + if (requests == null) { + requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shard.shardId().getIndex(), shardId.id(), percolateRequest.preference())); + } + requests.add(new TransportShardMultiPercolateAction.Request.Item(slot, new PercolateShardRequest(shardId, percolateRequest))); + + TIntArrayList items = shardToSlotsBuilder.get(shardId); + if (items == null) { + shardToSlotsBuilder.put(shardId, items = new TIntArrayList()); + } + items.add(slot); } - requests.add(new TransportShardMultiPercolateAction.Request.Item(i, new PercolateShardRequest(shardId, percolateRequest))); - - TIntArrayList items = shardToSlotsBuilder.get(shardId); - if (items == null) { - shardToSlotsBuilder.put(shardId, items = new TIntArrayList()); - } - items.add(i); + expectedResults++; + } else if (element instanceof Throwable || element instanceof MultiGetResponse.Failure) { + reducedResponses.set(slot, element); + responsesByItemAndShard.set(slot, new AtomicReferenceArray(0)); + expectedOperationsPerItem.set(slot, new AtomicInteger(0)); } - expectedResults++; - } else if (element instanceof Throwable) { - reducedResponses.set(i, element); - responsesByItemAndShard.set(i, new AtomicReferenceArray(0)); - expectedOperationsPerItem.set(i, new AtomicInteger(0)); + } + expectedOperations = new AtomicInteger(expectedResults); + // Move slot to shard tracking from normal map to concurrent save map + shardToSlots = ConcurrentCollections.newConcurrentMap(); + for (Map.Entry entry : shardToSlotsBuilder.entrySet()) { + shardToSlots.put(entry.getKey(), new AtomicIntegerArray(entry.getValue().toArray())); } } - if (expectedResults == 0) { - finish(reducedResponses, listener); - return; - } + void run() { + if (expectedOperations.get() == 0) { + finish(); + return; + } - // Move slot to shard tracking from normal map to concurrent save map - final ConcurrentMap shardToSlots = ConcurrentCollections.newConcurrentMap(); - for (Map.Entry entry : shardToSlotsBuilder.entrySet()) { - shardToSlots.put(entry.getKey(), new AtomicIntegerArray(entry.getValue().toArray())); - } + for (Map.Entry entry : requestsByShard.entrySet()) { + final int shardId = entry.getKey().id(); + final String index = entry.getKey().index().name(); - final AtomicInteger expectedOperations = new AtomicInteger(expectedResults); - for (Map.Entry entry : requestsByShard.entrySet()) { - final ShardId shardId = entry.getKey(); - final TransportShardMultiPercolateAction.Request shardRequest = entry.getValue(); - shardMultiPercolateAction.execute(shardRequest, new ActionListener() { + TransportShardMultiPercolateAction.Request shardRequest = entry.getValue(); + shardMultiPercolateAction.execute(shardRequest, new ActionListener() { - @Override - @SuppressWarnings("unchecked") - public void onResponse(TransportShardMultiPercolateAction.Response response) { - try { - for (TransportShardMultiPercolateAction.Response.Item item : response.items()) { - AtomicReferenceArray shardResults = responsesByItemAndShard.get(item.slot()); - if (shardResults == null) { - continue; - } - - if (item.failed()) { - shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, item.error().string())); - } else { - shardResults.set(shardId.id(), item.response()); - } - - assert expectedOperationsPerItem.get(item.slot()).get() >= 1 : "slot[" + item.slot() + "] can't be lower than one"; - if (expectedOperationsPerItem.get(item.slot()).decrementAndGet() == 0) { - // Failure won't bubble up, since we fail the whole request now via the catch clause below, - // so expectedOperationsPerItem will not be decremented twice. - reduce(item.slot(), percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard); - } - } - } catch (Throwable e) { - logger.error("{} Percolate original reduce error", e, shardId); - listener.onFailure(e); + @Override + public void onResponse(TransportShardMultiPercolateAction.Response response) { + onShardResponse(new ShardId(index, shardId), response); } - } - @Override - @SuppressWarnings("unchecked") - public void onFailure(Throwable e) { - logger.debug("Shard multi percolate failure", e); - try { - AtomicIntegerArray slots = shardToSlots.get(shardId); - for (int i = 0; i < slots.length(); i++) { - int slot = slots.get(i); - AtomicReferenceArray shardResults = responsesByItemAndShard.get(slot); - if (shardResults == null) { - continue; - } - - shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, e)); - assert expectedOperationsPerItem.get(slot).get() >= 1 : "slot[" + slot + "] can't be lower than one. Caused by: " + e.getMessage(); - if (expectedOperationsPerItem.get(slot).decrementAndGet() == 0) { - reduce(slot, percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard); - } - } - } catch (Throwable t) { - logger.error("{} Percolate original reduce error, original error {}", t, shardId, e); - listener.onFailure(t); + @Override + public void onFailure(Throwable e) { + onShardFailure(new ShardId(index, shardId), e); } - } - }); - } - } - - private void reduce(int slot, - AtomicReferenceArray percolateRequests, - AtomicInteger expectedOperations, - AtomicArray reducedResponses, - ActionListener listener, - AtomicReferenceArray responsesByItemAndShard) { - - AtomicReferenceArray shardResponses = responsesByItemAndShard.get(slot); - PercolateResponse reducedResponse = TransportPercolateAction.reduce((PercolateRequest) percolateRequests.get(slot), shardResponses, percolatorService); - reducedResponses.set(slot, reducedResponse); - assert expectedOperations.get() >= 1 : "slot[" + slot + "] expected options should be >= 1 but is " + expectedOperations.get(); - if (expectedOperations.decrementAndGet() == 0) { - finish(reducedResponses, listener); - } - } - - private void finish(AtomicArray reducedResponses, ActionListener listener) { - MultiPercolateResponse.Item[] finalResponse = new MultiPercolateResponse.Item[reducedResponses.length()]; - for (int i = 0; i < reducedResponses.length(); i++) { - Object element = reducedResponses.get(i); - assert element != null : "Element[" + i + "] shouldn't be null"; - if (element instanceof PercolateResponse) { - finalResponse[i] = new MultiPercolateResponse.Item((PercolateResponse) element); - } else if (element instanceof Throwable) { - finalResponse[i] = new MultiPercolateResponse.Item(ExceptionsHelper.detailedMessage((Throwable) element)); + }); } } - listener.onResponse(new MultiPercolateResponse(finalResponse)); + + @SuppressWarnings("unchecked") + void onShardResponse(ShardId shardId, TransportShardMultiPercolateAction.Response response) { + try { + for (TransportShardMultiPercolateAction.Response.Item item : response.items()) { + AtomicReferenceArray shardResults = responsesByItemAndShard.get(item.slot()); + if (shardResults == null) { + assert false : "shardResults can't be null"; + continue; + } + + if (item.failed()) { + shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, item.error().string())); + } else { + shardResults.set(shardId.id(), item.response()); + } + + assert expectedOperationsPerItem.get(item.slot()).get() >= 1 : "slot[" + item.slot() + "] can't be lower than one"; + if (expectedOperationsPerItem.get(item.slot()).decrementAndGet() == 0) { + // Failure won't bubble up, since we fail the whole request now via the catch clause below, + // so expectedOperationsPerItem will not be decremented twice. + reduce(item.slot()); + } + } + } catch (Throwable e) { + logger.error("{} Percolate original reduce error", e, shardId); + listener.onFailure(e); + } + } + + @SuppressWarnings("unchecked") + void onShardFailure(ShardId shardId, Throwable e) { + logger.debug("Shard multi percolate failure", e); + try { + AtomicIntegerArray slots = shardToSlots.get(shardId); + for (int i = 0; i < slots.length(); i++) { + int slot = slots.get(i); + AtomicReferenceArray shardResults = responsesByItemAndShard.get(slot); + if (shardResults == null) { + continue; + } + + shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, e)); + assert expectedOperationsPerItem.get(slot).get() >= 1 : "slot[" + slot + "] can't be lower than one. Caused by: " + e.getMessage(); + if (expectedOperationsPerItem.get(slot).decrementAndGet() == 0) { + reduce(slot); + } + } + } catch (Throwable t) { + logger.error("{} Percolate original reduce error, original error {}", t, shardId, e); + listener.onFailure(t); + } + } + + void reduce(int slot) { + AtomicReferenceArray shardResponses = responsesByItemAndShard.get(slot); + PercolateResponse reducedResponse = TransportPercolateAction.reduce((PercolateRequest) percolateRequests.get(slot), shardResponses, percolatorService); + reducedResponses.set(slot, reducedResponse); + assert expectedOperations.get() >= 1 : "slot[" + slot + "] expected options should be >= 1 but is " + expectedOperations.get(); + if (expectedOperations.decrementAndGet() == 0) { + finish(); + } + } + + void finish() { + MultiPercolateResponse.Item[] finalResponse = new MultiPercolateResponse.Item[reducedResponses.length()]; + for (int slot = 0; slot < reducedResponses.length(); slot++) { + Object element = reducedResponses.get(slot); + assert element != null : "Element[" + slot + "] shouldn't be null"; + if (element instanceof PercolateResponse) { + finalResponse[slot] = new MultiPercolateResponse.Item((PercolateResponse) element); + } else if (element instanceof Throwable) { + finalResponse[slot] = new MultiPercolateResponse.Item(ExceptionsHelper.detailedMessage((Throwable) element)); + } else if (element instanceof MultiGetResponse.Failure) { + finalResponse[slot] = new MultiPercolateResponse.Item(((MultiGetResponse.Failure)element).getMessage()); + } + } + listener.onResponse(new MultiPercolateResponse(finalResponse)); + } + } diff --git a/src/main/java/org/elasticsearch/action/percolate/TransportShardMultiPercolateAction.java b/src/main/java/org/elasticsearch/action/percolate/TransportShardMultiPercolateAction.java index b738fd90e28..f2f2422404c 100644 --- a/src/main/java/org/elasticsearch/action/percolate/TransportShardMultiPercolateAction.java +++ b/src/main/java/org/elasticsearch/action/percolate/TransportShardMultiPercolateAction.java @@ -99,16 +99,16 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper Response response = new Response(); response.items = new ArrayList(request.items.size()); for (Request.Item item : request.items) { - Response.Item responseItem = new Response.Item(); - responseItem.slot = item.slot; + Response.Item responseItem; + int slot = item.slot; try { - responseItem.response = percolatorService.percolate(item.request); + responseItem = new Response.Item(slot, percolatorService.percolate(item.request)); } catch (Throwable e) { logger.debug("[{}][{}] failed to multi percolate", e, request.index(), request.shardId()); if (TransportActions.isShardNotAvailableException(e)) { throw new ElasticSearchException("", e); } else { - responseItem.error = new StringText(ExceptionsHelper.detailedMessage(e)); + responseItem = new Response.Item(slot, new StringText(ExceptionsHelper.detailedMessage(e))); } } response.items.add(responseItem); @@ -123,6 +123,8 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper private String preference; private List items; + private volatile boolean done = false; + public Request() { } @@ -153,13 +155,13 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper int size = in.readVInt(); items = new ArrayList(size); for (int i = 0; i < size; i++) { - Item item = new Item(); - item.slot = in.readVInt(); - item.request = new PercolateShardRequest(index(), shardId); - item.request.documentType(in.readString()); - item.request.source(in.readBytesReference()); - item.request.docSource(in.readBytesReference()); - item.request.onlyCount(in.readBoolean()); + int slot = in.readVInt(); + PercolateShardRequest shardRequest = new PercolateShardRequest(index(), shardId); + shardRequest.documentType(in.readString()); + shardRequest.source(in.readBytesReference()); + shardRequest.docSource(in.readBytesReference()); + shardRequest.onlyCount(in.readBoolean()); + Item item = new Item(slot, shardRequest); items.add(item); } } @@ -181,11 +183,8 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper public static class Item { - private int slot; - private PercolateShardRequest request; - - Item() { - } + private final int slot; + private final PercolateShardRequest request; public Item(int slot, PercolateShardRequest request) { this.slot = slot; @@ -234,23 +233,34 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper int size = in.readVInt(); items = new ArrayList(size); for (int i = 0; i < size; i++) { - Item item = new Item(); - item.slot = in.readVInt(); + int slot = in.readVInt(); if (in.readBoolean()) { - item.response = new PercolateShardResponse(); - item.response.readFrom(in); + PercolateShardResponse shardResponse = new PercolateShardResponse(); + shardResponse.readFrom(in); + items.add(new Item(slot, shardResponse)); } else { - item.error = in.readText(); + items.add(new Item(slot, in.readText())); } - items.add(item); } } public static class Item { - private int slot; - private PercolateShardResponse response; - private Text error; + private final int slot; + private final PercolateShardResponse response; + private final Text error; + + public Item(Integer slot, PercolateShardResponse response) { + this.slot = slot; + this.response = response; + this.error = null; + } + + public Item(Integer slot, Text error) { + this.slot = slot; + this.error = error; + this.response = null; + } public int slot() { return slot;