diff --git a/src/main/java/org/elasticsearch/action/ActionModule.java b/src/main/java/org/elasticsearch/action/ActionModule.java index b9d4508657b..894459de773 100644 --- a/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/src/main/java/org/elasticsearch/action/ActionModule.java @@ -120,10 +120,7 @@ import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.mlt.MoreLikeThisAction; import org.elasticsearch.action.mlt.TransportMoreLikeThisAction; -import org.elasticsearch.action.percolate.MultiPercolateAction; -import org.elasticsearch.action.percolate.PercolateAction; -import org.elasticsearch.action.percolate.TransportMultiPercolateAction; -import org.elasticsearch.action.percolate.TransportPercolateAction; +import org.elasticsearch.action.percolate.*; import org.elasticsearch.action.search.*; import org.elasticsearch.action.search.type.*; import org.elasticsearch.action.suggest.SuggestAction; @@ -253,7 +250,7 @@ public class ActionModule extends AbstractModule { registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class); registerAction(MoreLikeThisAction.INSTANCE, TransportMoreLikeThisAction.class); registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class); - registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class); + registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class); registerAction(ExplainAction.INSTANCE, TransportExplainAction.class); // register Name -> GenericAction Map that can be injected to instances. diff --git a/src/main/java/org/elasticsearch/action/percolate/MultiPercolateResponse.java b/src/main/java/org/elasticsearch/action/percolate/MultiPercolateResponse.java index d963cc409af..e95903e0893 100644 --- a/src/main/java/org/elasticsearch/action/percolate/MultiPercolateResponse.java +++ b/src/main/java/org/elasticsearch/action/percolate/MultiPercolateResponse.java @@ -41,6 +41,7 @@ public class MultiPercolateResponse extends ActionResponse implements Iterable { - private final TransportPercolateAction percolateAction; private final ClusterService clusterService; + private final PercolatorService percolatorService; + + private final TransportMultiGetAction multiGetAction; + private final TransportShardMultiPercolateAction shardMultiPercolateAction; @Inject - public TransportMultiPercolateAction(Settings settings, ThreadPool threadPool, TransportPercolateAction percolateAction, ClusterService clusterService, TransportService transportService) { + public TransportMultiPercolateAction(Settings settings, ThreadPool threadPool, TransportShardMultiPercolateAction shardMultiPercolateAction, + ClusterService clusterService, TransportService transportService, PercolatorService percolatorService, + TransportMultiGetAction multiGetAction) { super(settings, threadPool); - this.percolateAction = percolateAction; + this.shardMultiPercolateAction = shardMultiPercolateAction; this.clusterService = clusterService; + this.percolatorService = percolatorService; + this.multiGetAction = multiGetAction; transportService.registerHandler(MultiPercolateAction.NAME, new TransportHandler()); } @Override - protected void doExecute(MultiPercolateRequest request, final ActionListener listener) { - ClusterState clusterState = clusterService.state(); + protected void doExecute(final MultiPercolateRequest request, final ActionListener listener) { + final ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); + @SuppressWarnings("unchecked") + final List percolateRequests = (List) request.requests(); + + final TIntArrayList slots = new TIntArrayList(); + final List existingDocsRequests = new ArrayList(); + for (int i = 0; i < request.requests().size(); i++) { + PercolateRequest percolateRequest = request.requests().get(i); + percolateRequest.startTime = System.currentTimeMillis(); + if (percolateRequest.getRequest() != null) { + existingDocsRequests.add(percolateRequest.getRequest()); + slots.add(i); + } + } + + if (!existingDocsRequests.isEmpty()) { + final MultiGetRequest multiGetRequest = new MultiGetRequest(); + for (GetRequest getRequest : existingDocsRequests) { + multiGetRequest.add( + new MultiGetRequest.Item(getRequest.index(), getRequest.type(), getRequest.id()) + .routing(getRequest.routing()) + ); + } + + multiGetAction.execute(multiGetRequest, new ActionListener() { - final MultiPercolateResponse.Item[] responses = new MultiPercolateResponse.Item[request.requests().size()]; - final AtomicInteger counter = new AtomicInteger(responses.length); - for (int i = 0; i < responses.length; i++) { - final int index = i; - percolateAction.execute(request.requests().get(i), new ActionListener() { @Override - public void onResponse(PercolateResponse percolateResponse) { - synchronized (responses) { - responses[index] = new MultiPercolateResponse.Item(percolateResponse, null); - } - if (counter.decrementAndGet() == 0) { - finishHim(); + public void onResponse(MultiGetResponse multiGetItemResponses) { + for (int i = 0; i < multiGetItemResponses.getResponses().length; i++) { + MultiGetItemResponse itemResponse = multiGetItemResponses.getResponses()[i]; + int slot = slots.get(i); + if (!itemResponse.isFailed()) { + GetResponse getResponse = itemResponse.getResponse(); + if (getResponse.isExists()) { + percolateRequests.set(slot, new PercolateRequest((PercolateRequest) percolateRequests.get(slot), getResponse.getSourceAsBytesRef())); + } else { + percolateRequests.set(slot, new DocumentMissingException(null, getResponse.getType(), getResponse.getId())); + } + } else { + percolateRequests.set(slot, itemResponse.getFailure()); + } } + multiPercolate(request, percolateRequests, listener, clusterState); } @Override public void onFailure(Throwable e) { - synchronized (responses) { - responses[index] = new MultiPercolateResponse.Item(null, ExceptionsHelper.detailedMessage(e)); + listener.onFailure(e); + } + }); + } else { + multiPercolate(request, percolateRequests, listener, clusterState); + } + + } + + private void multiPercolate(MultiPercolateRequest multiPercolateRequest, final List percolateRequests, + final ActionListener listener, ClusterState clusterState) { + + final AtomicInteger[] expectedOperationsPerItem = new AtomicInteger[percolateRequests.size()]; + final AtomicReferenceArray responsesByItemAndShard = new AtomicReferenceArray(multiPercolateRequest.requests().size()); + final AtomicArray reducedResponses = new AtomicArray(percolateRequests.size()); + + // Resolving concrete indices and routing and grouping the requests by shard + final Map requestsByShard = new HashMap(); + int expectedResults = 0; + for (int i = 0; i < percolateRequests.size(); i++) { + Object element = percolateRequests.get(i); + assert element != null; + if (element instanceof PercolateRequest) { + PercolateRequest percolateRequest = (PercolateRequest) element; + String[] concreteIndices = clusterState.metaData().concreteIndices(percolateRequest.indices(), percolateRequest.ignoreIndices(), true); + Map> routing = clusterState.metaData().resolveSearchRouting(percolateRequest.routing(), multiPercolateRequest.indices()); + // TODO: I only need shardIds, ShardIterator(ShardRouting) is only needed in TransportShardMultiPercolateAction + GroupShardsIterator shards = clusterService.operationRouting().searchShards( + clusterState, percolateRequest.indices(), concreteIndices, routing, percolateRequest.preference() + ); + + responsesByItemAndShard.set(i, new AtomicReferenceArray(shards.size())); + expectedOperationsPerItem[i] = new AtomicInteger(shards.size()); + for (ShardIterator shard : shards) { + ShardId shardId = shard.shardId(); + TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId); + if (requests == null) { + requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shard.shardId().getIndex(), shardId.id(), percolateRequest.preference())); } - if (counter.decrementAndGet() == 0) { - finishHim(); + requests.add(new TransportShardMultiPercolateAction.Request.Item(i, new PercolateShardRequest(shardId, percolateRequest))); + } + expectedResults++; + } else if (element instanceof Throwable) { + reducedResponses.set(i, element); + responsesByItemAndShard.set(i, new AtomicReferenceArray(0)); + expectedOperationsPerItem[i] = new AtomicInteger(0); + } + } + + if (expectedResults == 0) { + finish(reducedResponses, listener); + return; + } + + final AtomicInteger expectedOperations = new AtomicInteger(expectedResults); + for (Map.Entry entry : requestsByShard.entrySet()) { + final ShardId shardId = entry.getKey(); + final TransportShardMultiPercolateAction.Request shardRequest = entry.getValue(); + shardMultiPercolateAction.execute(shardRequest, new ActionListener() { + + @Override + @SuppressWarnings("unchecked") + public void onResponse(TransportShardMultiPercolateAction.Response response) { + try { + for (TransportShardMultiPercolateAction.Response.Item item : response.items()) { + AtomicReferenceArray shardResults = responsesByItemAndShard.get(item.slot()); + if (shardResults == null) { + continue; + } + + if (item.failed()) { + shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, item.error().string())); + } else { + shardResults.set(shardId.id(), item.response()); + } + + assert expectedOperationsPerItem[item.slot()].get() >= 1; + if (expectedOperationsPerItem[item.slot()].decrementAndGet() == 0) { + reduce(item.slot(), percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard); + } + } + } catch (Throwable e) { + listener.onFailure(e); } } - private void finishHim() { - listener.onResponse(new MultiPercolateResponse(responses)); + @Override + @SuppressWarnings("unchecked") + public void onFailure(Throwable e) { + try { + for (TransportShardMultiPercolateAction.Request.Item item : shardRequest.items()) { + AtomicReferenceArray shardResults = responsesByItemAndShard.get(item.slot()); + if (shardResults == null) { + continue; + } + + shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, e)); + assert expectedOperationsPerItem[item.slot()].get() >= 1; + if (expectedOperationsPerItem[item.slot()].decrementAndGet() == 0) { + reduce(item.slot(), percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard); + } + } + } catch (Throwable t) { + logger.error("{} Percolate original reduce error", e, shardId); + listener.onFailure(t); + } } + }); } } + private void reduce(int slot, + List percolateRequests, + AtomicInteger expectedOperations, + AtomicArray reducedResponses, + ActionListener listener, + AtomicReferenceArray responsesByItemAndShard) { + + AtomicReferenceArray shardResponses = responsesByItemAndShard.get(slot); + PercolateResponse reducedResponse = TransportPercolateAction.reduce((PercolateRequest) percolateRequests.get(slot), shardResponses, percolatorService); + reducedResponses.set(slot, reducedResponse); + assert expectedOperations.get() >= 1; + if (expectedOperations.decrementAndGet() == 0) { + finish(reducedResponses, listener); + } + } + + private void finish(AtomicArray reducedResponses, ActionListener listener) { + MultiPercolateResponse.Item[] finalResponse = new MultiPercolateResponse.Item[reducedResponses.length()]; + for (int i = 0; i < reducedResponses.length(); i++) { + Object element = reducedResponses.get(i); + assert element != null; + if (element instanceof PercolateResponse) { + finalResponse[i] = new MultiPercolateResponse.Item((PercolateResponse) element); + } else if (element instanceof Throwable) { + finalResponse[i] = new MultiPercolateResponse.Item(ExceptionsHelper.detailedMessage((Throwable) element)); + } + } + listener.onResponse(new MultiPercolateResponse(finalResponse)); + } + + class TransportHandler extends BaseTransportRequestHandler { @Override @@ -94,7 +287,7 @@ public class TransportMultiPercolateAction extends TransportAction { + + private final PercolatorService percolatorService; + + @Inject + public TransportShardMultiPercolateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, PercolatorService percolatorService) { + super(settings, threadPool, clusterService, transportService); + this.percolatorService = percolatorService; + } + + @Override + protected String transportAction() { + return "mpercolate/shard"; + } + + @Override + protected String executor() { + return ThreadPool.Names.PERCOLATE; + } + + @Override + protected Request newRequest() { + return new Request(); + } + + @Override + protected Response newResponse() { + return new Response(); + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) { + return state.blocks().indexBlockedException(ClusterBlockLevel.READ, request.index()); + } + + @Override + protected ShardIterator shards(ClusterState state, Request request) throws ElasticSearchException { + return clusterService.operationRouting().getShards( + clusterService.state(), request.index(), request.shardId(), request.preference + ); + } + + @Override + protected Response shardOperation(Request request, int shardId) throws ElasticSearchException { + // TODO: Look into combining the shard req's docs into one in memory index. + Response response = new Response(); + response.items = new ArrayList(request.items.size()); + for (Request.Item item : request.items) { + Response.Item responseItem = new Response.Item(); + responseItem.slot = item.slot; + try { + responseItem.response = percolatorService.percolate(item.request); + } catch (Throwable e) { + logger.trace("[{}][{}] failed to multi percolate", e, request.index(), request.shardId()); + if (TransportActions.isShardNotAvailableException(e)) { + throw new ElasticSearchException("", e); + } else { + responseItem.error = new StringText(ExceptionsHelper.detailedMessage(e)); + } + } + response.items.add(responseItem); + } + return response; + } + + + public static class Request extends SingleShardOperationRequest { + + private int shardId; + private String preference; + private List items; + + public Request() { + } + + public Request(String concreteIndex, int shardId, String preference) { + this.index = concreteIndex; + this.shardId = shardId; + this.preference = preference; + this.items = new ArrayList(); + } + + public int shardId() { + return shardId; + } + + public void add(Item item) { + items.add(item); + } + + public List items() { + return items; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardId = in.readVInt(); + preference = in.readOptionalString(); + int size = in.readVInt(); + items = new ArrayList(size); + for (int i = 0; i < size; i++) { + Item item = new Item(); + item.slot = in.readVInt(); + item.request = new PercolateShardRequest(index(), shardId); + item.request.documentType(in.readString()); + item.request.source(in.readBytesReference()); + item.request.docSource(in.readBytesReference()); + item.request.onlyCount(in.readBoolean()); + items.add(item); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shardId); + out.writeOptionalString(preference); + out.writeVInt(items.size()); + for (Item item : items) { + out.writeVInt(item.slot); + out.writeString(item.request.documentType()); + out.writeBytesReference(item.request.source()); + out.writeBytesReference(item.request.docSource()); + out.writeBoolean(item.request.onlyCount()); + } + } + + public static class Item { + + private int slot; + private PercolateShardRequest request; + + Item() { + } + + public Item(int slot, PercolateShardRequest request) { + this.slot = slot; + this.request = request; + } + + public int slot() { + return slot; + } + + public PercolateShardRequest request() { + return request; + } + + } + + } + + public static class Response extends ActionResponse { + + private List items; + + public List items() { + return items; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(items.size()); + for (Item item : items) { + out.writeVInt(item.slot); + if (item.response != null) { + out.writeBoolean(true); + item.response.writeTo(out); + } else { + out.writeBoolean(false); + out.writeText(item.error); + } + } + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int size = in.readVInt(); + items = new ArrayList(size); + for (int i = 0; i < size; i++) { + Item item = new Item(); + item.slot = in.readVInt(); + if (in.readBoolean()) { + item.response = new PercolateShardResponse(); + item.response.readFrom(in); + } else { + item.error = in.readText(); + } + items.add(item); + } + } + + public static class Item { + + private int slot; + private PercolateShardResponse response; + private Text error; + + public int slot() { + return slot; + } + + public PercolateShardResponse response() { + return response; + } + + public Text error() { + return error; + } + + public boolean failed() { + return error != null; + } + } + + } + +} diff --git a/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationRequest.java b/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationRequest.java index 020b7f10c22..0824205b937 100644 --- a/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationRequest.java +++ b/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationRequest.java @@ -42,6 +42,11 @@ public abstract class BroadcastShardOperationRequest extends TransportRequest { this.shardId = shardId; } + public BroadcastShardOperationRequest(String index, int shardId) { + this.index = index; + this.shardId = shardId; + } + public String index() { return this.index; } diff --git a/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/percolator/PercolatorService.java index 952d66d13ec..8fb03608d1b 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/percolator/PercolatorService.java @@ -217,10 +217,11 @@ public class PercolatorService extends AbstractComponent { XContentParser parser = null; // Some queries (function_score query when for decay functions) rely on SearchContext being set: - SearchContext.setCurrent(new SearchContext(0, + SearchContext searchContext = new SearchContext(0, new ShardSearchRequest().types(new String[0]), null, context.indexShard.searcher(), context.percolateIndexService, context.indexShard, - null, null)); + null, null); + SearchContext.setCurrent(searchContext); try { parser = XContentFactory.xContent(source).createParser(source); String currentFieldName = null; @@ -271,7 +272,7 @@ public class PercolatorService extends AbstractComponent { } catch (IOException e) { throw new ElasticSearchParseException("failed to parse request", e); } finally { - SearchContext.current().release(); + searchContext.release(); SearchContext.removeCurrent(); if (parser != null) { parser.close(); diff --git a/src/test/java/org/elasticsearch/test/integration/percolator/MultiPercolatorTests.java b/src/test/java/org/elasticsearch/test/integration/percolator/MultiPercolatorTests.java index a31a7d436a7..f522207b63d 100644 --- a/src/test/java/org/elasticsearch/test/integration/percolator/MultiPercolatorTests.java +++ b/src/test/java/org/elasticsearch/test/integration/percolator/MultiPercolatorTests.java @@ -18,14 +18,18 @@ package org.elasticsearch.test.integration.percolator; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder; import org.elasticsearch.action.percolate.MultiPercolateResponse; import org.elasticsearch.client.Requests; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.junit.Test; import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.*; import static org.elasticsearch.index.query.QueryBuilders.*; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.integration.percolator.SimplePercolatorTests.convertFromTextArray; import static org.hamcrest.Matchers.*; @@ -80,6 +84,8 @@ public class MultiPercolatorTests extends AbstractSharedClusterTest { item = response.getItems()[1]; assertThat(item.errorMessage(), nullValue()); + + assertNoFailures(item.response()); assertThat(item.getResponse().getMatches(), arrayWithSize(2)); assertThat(item.getResponse().getMatches(), arrayWithSize(2)); assertThat(item.getResponse().getCount(), equalTo(2l)); @@ -87,12 +93,14 @@ public class MultiPercolatorTests extends AbstractSharedClusterTest { item = response.getItems()[2]; assertThat(item.errorMessage(), nullValue()); + assertNoFailures(item.response()); assertThat(item.getResponse().getMatches(), arrayWithSize(4)); assertThat(item.getResponse().getCount(), equalTo(4l)); assertThat(convertFromTextArray(item.getResponse().getMatches(), "test"), arrayContainingInAnyOrder("1", "2", "3", "4")); item = response.getItems()[3]; assertThat(item.errorMessage(), nullValue()); + assertNoFailures(item.response()); assertThat(item.getResponse().getMatches(), arrayWithSize(1)); assertThat(item.getResponse().getCount(), equalTo(1l)); assertThat(convertFromTextArray(item.getResponse().getMatches(), "test"), arrayContaining("4")); @@ -103,4 +111,162 @@ public class MultiPercolatorTests extends AbstractSharedClusterTest { assertThat(item.errorMessage(), containsString("document missing")); } + @Test + public void testExistingDocsOnly() throws Exception { + client().admin().indices().prepareCreate("test") + .setSettings( + ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 1) + .build()) + .execute().actionGet(); + ensureGreen(); + + int numQueries = randomIntBetween(50, 100); + logger.info("--> register a queries"); + for (int i = 0; i < numQueries; i++) { + client().prepareIndex("test", "_percolator", Integer.toString(i)) + .setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject()) + .execute().actionGet(); + } + + client().prepareIndex("test", "type", "1") + .setSource(jsonBuilder().startObject().field("field", "a")) + .execute().actionGet(); + + MultiPercolateRequestBuilder builder = client().prepareMultiPercolate(); + int numPercolateRequest = randomIntBetween(50, 100); + for (int i = 0; i < numPercolateRequest; i++) { + builder.add( + client().preparePercolate() + .setGetRequest(Requests.getRequest("test").type("type").id("1")) + .setIndices("test").setDocumentType("type")); + } + + MultiPercolateResponse response = builder.execute().actionGet(); + assertThat(response.items().length, equalTo(numPercolateRequest)); + for (MultiPercolateResponse.Item item : response) { + assertThat(item.isFailure(), equalTo(false)); + assertNoFailures(item.response()); + assertThat(item.getResponse().getCount(), equalTo((long) numQueries)); + assertThat(item.getResponse().getMatches().length, equalTo(numQueries)); + } + + // Non existing doc + builder = client().prepareMultiPercolate(); + for (int i = 0; i < numPercolateRequest; i++) { + builder.add( + client().preparePercolate() + .setGetRequest(Requests.getRequest("test").type("type").id("2")) + .setIndices("test").setDocumentType("type")); + } + + response = builder.execute().actionGet(); + assertThat(response.items().length, equalTo(numPercolateRequest)); + for (MultiPercolateResponse.Item item : response) { + assertThat(item.isFailure(), equalTo(true)); + assertThat(item.errorMessage(), containsString("document missing")); + assertThat(item.getResponse(), nullValue()); + } + + // One existing doc + builder = client().prepareMultiPercolate(); + for (int i = 0; i < numPercolateRequest; i++) { + builder.add( + client().preparePercolate() + .setGetRequest(Requests.getRequest("test").type("type").id("2")) + .setIndices("test").setDocumentType("type")); + } + builder.add( + client().preparePercolate() + .setGetRequest(Requests.getRequest("test").type("type").id("1")) + .setIndices("test").setDocumentType("type")); + + response = builder.execute().actionGet(); + assertThat(response.items().length, equalTo(numPercolateRequest + 1)); + assertThat(response.items()[numPercolateRequest].isFailure(), equalTo(false)); + assertNoFailures(response.items()[numPercolateRequest].response()); + assertThat(response.items()[numPercolateRequest].getResponse().getCount(), equalTo((long) numQueries)); + assertThat(response.items()[numPercolateRequest].getResponse().getMatches().length, equalTo(numQueries)); + } + + @Test + public void testWithDocsOnly() throws Exception { + client().admin().indices().prepareCreate("test") + .setSettings( + ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 1) + .build()) + .execute().actionGet(); + ensureGreen(); + + int numQueries = randomIntBetween(50, 100); + logger.info("--> register a queries"); + for (int i = 0; i < numQueries; i++) { + client().prepareIndex("test", "_percolator", Integer.toString(i)) + .setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject()) + .execute().actionGet(); + } + + MultiPercolateRequestBuilder builder = client().prepareMultiPercolate(); + int numPercolateRequest = randomIntBetween(50, 100); + for (int i = 0; i < numPercolateRequest; i++) { + builder.add( + client().preparePercolate() + .setIndices("test").setDocumentType("type") + .setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field", "a").endObject()))); + } + + MultiPercolateResponse response = builder.execute().actionGet(); + assertThat(response.items().length, equalTo(numPercolateRequest)); + for (MultiPercolateResponse.Item item : response) { + assertThat(item.isFailure(), equalTo(false)); + assertNoFailures(item.response()); + assertThat(item.getResponse().getCount(), equalTo((long) numQueries)); + assertThat(item.getResponse().getMatches().length, equalTo(numQueries)); + } + + // All illegal json + builder = client().prepareMultiPercolate(); + for (int i = 0; i < numPercolateRequest; i++) { + builder.add( + client().preparePercolate() + .setIndices("test").setDocumentType("type") + .setSource("illegal json")); + } + + response = builder.execute().actionGet(); + assertThat(response.items().length, equalTo(numPercolateRequest)); + for (MultiPercolateResponse.Item item : response) { + assertThat(item.isFailure(), equalTo(false)); + assertThat(item.getResponse().getSuccessfulShards(), equalTo(0)); + assertThat(item.getResponse().getShardFailures().length, equalTo(2)); + for (ShardOperationFailedException shardFailure : item.getResponse().getShardFailures()) { + assertThat(shardFailure.reason(), containsString("Failed to derive xcontent from")); + assertThat(shardFailure.status().getStatus(), equalTo(500)); + } + } + + // one valid request + builder = client().prepareMultiPercolate(); + for (int i = 0; i < numPercolateRequest; i++) { + builder.add( + client().preparePercolate() + .setIndices("test").setDocumentType("type") + .setSource("illegal json")); + } + builder.add( + client().preparePercolate() + .setIndices("test").setDocumentType("type") + .setPercolateDoc(docBuilder().setDoc(jsonBuilder().startObject().field("field", "a").endObject()))); + + response = builder.execute().actionGet(); + assertThat(response.items().length, equalTo(numPercolateRequest + 1)); + assertThat(response.items()[numPercolateRequest].isFailure(), equalTo(false)); + assertNoFailures(response.items()[numPercolateRequest].getResponse()); + assertThat(response.items()[numPercolateRequest].getResponse().getCount(), equalTo((long ) numQueries)); + assertThat(response.items()[numPercolateRequest].getResponse().getMatches().length, equalTo(numQueries)); + } + } diff --git a/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java b/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java index cc4711ac5b0..3cfb8f3fda3 100644 --- a/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java +++ b/src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java @@ -22,9 +22,14 @@ package org.elasticsearch.test.integration.percolator; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.percolate.MultiPercolateRequestBuilder; +import org.elasticsearch.action.percolate.MultiPercolateResponse; import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.node.internal.InternalNode; @@ -32,10 +37,17 @@ import org.elasticsearch.test.integration.AbstractNodesTests; import org.junit.After; import org.junit.Test; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder; import static org.elasticsearch.client.Requests.clusterHealthRequest; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.*; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.integration.percolator.SimplePercolatorTests.convertFromTextArray; import static org.elasticsearch.test.integration.percolator.TTLPercolatorTests.ensureGreen; import static org.hamcrest.Matchers.*; @@ -251,4 +263,174 @@ public class RecoveryPercolatorTests extends AbstractNodesTests { assertThat(response.getMatches(), arrayWithSize(1)); assertThat(response.getMatches()[0].id().string(), equalTo("100")); } + + @Test + public void testSinglePercolator_recovery() throws Exception { + multiPercolatorRecovery(false); + } + + @Test + public void testMultiPercolator_recovery() throws Exception { + multiPercolatorRecovery(true); + } + + // 3 nodes, 2 primary + 2 replicas per primary, so each node should have a copy of the data. + // We only start and stop nodes 2 and 3, so all requests should succeed and never be partial. + private void multiPercolatorRecovery(final boolean multiPercolate) throws Exception { + Settings settings = settingsBuilder() + .put("gateway.type", "none").build(); + logger.info("--> starting 3 nodes"); + startNode("node1", settings); + startNode("node2", settings); + startNode("node3", settings); + + final Client client = client("node1"); + client.admin().indices().prepareDelete().execute().actionGet(); + ensureGreen(client); + + client.admin().indices().prepareCreate("test") + .setSettings(settingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 2) + ) + .execute().actionGet(); + ensureGreen(client); + + final int numQueries = randomIntBetween(50, 100); + logger.info("--> register a queries"); + for (int i = 0; i < numQueries; i++) { + client().prepareIndex("test", "_percolator", Integer.toString(i)) + .setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject()) + .execute().actionGet(); + } + + client().prepareIndex("test", "type", "1") + .setSource(jsonBuilder().startObject().field("field", "a")) + .execute().actionGet(); + + final AtomicBoolean run = new AtomicBoolean(true); + final CountDownLatch done = new CountDownLatch(1); + final AtomicReference error = new AtomicReference(); + Runnable r = new Runnable() { + @Override + public void run() { + try { + XContentBuilder doc = null; + try { + doc = jsonBuilder().startObject().field("field", "a").endObject(); + } catch (IOException e) {} + + while (run.get()) { + /*NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo() + .execute().actionGet(); + String node2Id = null; + String node3Id = null; + for (NodeInfo nodeInfo : nodesInfoResponse) { + if ("node2".equals(nodeInfo.getNode().getName())) { + node2Id = nodeInfo.getNode().id(); + } else if ("node3".equals(nodeInfo.getNode().getName())) { + node3Id = nodeInfo.getNode().id(); + } + } + + String preference = "_prefer_node:" + (randomBoolean() ? node2Id : node3Id);*/ + if (multiPercolate) { + MultiPercolateRequestBuilder builder = client() + .prepareMultiPercolate(); + int numPercolateRequest = randomIntBetween(50, 100); + + for (int i = 0; i < numPercolateRequest / 2; i++) { + builder.add( + client().preparePercolate() +// .setPreference(preference) + .setIndices("test").setDocumentType("type") + .setPercolateDoc(docBuilder().setDoc(doc))); + } + + for (int i = numPercolateRequest / 2; i < numPercolateRequest; i++) { + builder.add( + client().preparePercolate() +// .setPreference(preference) + .setGetRequest(Requests.getRequest("test").type("type").id("1")) + .setIndices("test").setDocumentType("type") + ); + } + + MultiPercolateResponse response = builder.execute().actionGet(); + assertThat(response.items().length, equalTo(numPercolateRequest)); + for (MultiPercolateResponse.Item item : response) { + assertThat(item.isFailure(), equalTo(false)); + assertNoFailures(item.getResponse()); + assertThat(item.getResponse().getCount(), equalTo((long) numQueries)); + assertThat(item.getResponse().getMatches().length, equalTo(numQueries)); + } + } else { + PercolateResponse response; + if (randomBoolean()) { + response = client().preparePercolate() + .setIndices("test").setDocumentType("type") + .setPercolateDoc(docBuilder().setDoc(doc)) +// .setPreference(preference) + .execute().actionGet(); + } else { + response = client().preparePercolate() + .setGetRequest(Requests.getRequest("test").type("type").id("1")) + .setIndices("test").setDocumentType("type") +// .setPreference(preference) + .execute().actionGet(); + } + assertNoFailures(response); + assertThat(response.getCount(), equalTo((long) numQueries)); + assertThat(response.getMatches().length, equalTo(numQueries)); + } + } + } catch (Throwable t) { + logger.info("Error in percolate thread...", t); + run.set(false); + error.set(t); + } finally { + done.countDown(); + } + } + }; + new Thread(r).start(); + + try { + for (int i = 0; i < 4; i++) { + closeNode("node3"); + client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForYellowStatus() + .setWaitForNodes("2") + .execute().actionGet(); + assertThat(error.get(), nullValue()); + closeNode("node2"); + client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForYellowStatus() + .setWaitForNodes("1") + .execute().actionGet(); + assertThat(error.get(), nullValue()); + startNode("node3"); + client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForYellowStatus() + .setWaitForNodes("2") + .execute().actionGet(); + assertThat(error.get(), nullValue()); + startNode("node2"); + client().admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForYellowStatus() + .setWaitForNodes("3") + .execute().actionGet(); + assertThat(error.get(), nullValue()); + } + } finally { + run.set(false); + } + done.await(); + assertThat(error.get(), nullValue()); + } + }