From 3f6877ec2b22cc6647b6e3cf06a7423c11c48a54 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 29 Jul 2013 13:51:01 +0200 Subject: [PATCH] Rewrote the percolate existing doc api. The percolate existing doc feature now reuses the get request instead of having a separate request body. Relates to #3380 --- .../action/percolate/PercolateRequest.java | 43 ++++-- .../percolate/PercolateRequestBuilder.java | 15 +- .../percolate/PercolateShardRequest.java | 19 +-- .../percolate/PercolateSourceBuilder.java | 106 -------------- .../percolate/TransportPercolateAction.java | 138 +++--------------- .../index/percolator/PercolatorService.java | 12 +- .../action/percolate/RestPercolateAction.java | 51 +++---- .../percolator/SimplePercolatorTests.java | 58 ++++++-- 8 files changed, 142 insertions(+), 300 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java b/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java index bd1f49fc1aa..a152093c079 100644 --- a/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java +++ b/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.percolate; import org.elasticsearch.ElasticSearchGenerationException; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.common.bytes.BytesArray; @@ -46,11 +47,12 @@ public class PercolateRequest extends BroadcastOperationRequest listener) { - resolveGet(request, listener); - } - - // Add redirect here if a request ends up on a non data node? In the case when percolating an existing doc this - // could be beneficial. - void resolveGet(PercolateRequest originalRequest, ActionListener listener) { - originalRequest.startTime = System.currentTimeMillis(); - BytesReference body = originalRequest.source(); - Tuple tuple = null; - - XContentParser parser = null; - try { - parser = XContentFactory.xContent(body).createParser(body); - String currentFieldName = null; - XContentParser.Token token = parser.nextToken(); - if (token != XContentParser.Token.START_OBJECT) { - throw new ElasticSearchParseException("percolate request didn't start with start object"); - } - - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - // we need to check the "doc" here, so the next token will be START_OBJECT which is - // the actual document starting - if ("doc".equals(currentFieldName)) { - parser.close(); - super.doExecute(originalRequest, listener); + protected void doExecute(final PercolateRequest request, final ActionListener listener) { + request.startTime = System.currentTimeMillis(); + if (request.getRequest() != null) { + getAction.execute(request.getRequest(), new ActionListener() { + @Override + public void onResponse(GetResponse getResponse) { + if (!getResponse.isExists()) { + onFailure(new DocumentMissingException(null, request.getRequest().type(), request.getRequest().id())); return; } - } else if (token == XContentParser.Token.START_OBJECT) { - if ("get".equals(currentFieldName)) { - tuple = createGetRequest(parser, originalRequest.indices()[0], originalRequest.documentType()); - break; - } else { - parser.skipChildren(); - } - } else if (token == null) { - break; - } else { - parser.skipChildren(); - } - } - // docSource shouldn't be null - assert tuple != null; - executeGet(tuple, originalRequest, listener); - } catch (IOException e) { - throw new ElasticSearchParseException("failed to parse request", e); - } finally { - if (parser != null) { - parser.close(); - } + BytesReference docSource = getResponse.getSourceAsBytesRef(); + TransportPercolateAction.super.doExecute(new PercolateRequest(request, docSource), listener); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + } else { + super.doExecute(request, listener); } } - void executeGet(Tuple tuple, final PercolateRequest originalRequest, final ActionListener listener) { - final GetRequest getRequest = tuple.v1(); - final Long getVersion = tuple.v2(); - getAction.execute(tuple.v1(), new ActionListener() { - @Override - public void onResponse(GetResponse getResponse) { - if (!getResponse.isExists()) { - onFailure(new DocumentMissingException(null, getRequest.type(), getRequest.id())); - return; - } - - if (getVersion != null && getVersion != getResponse.getVersion()) { - onFailure(new VersionConflictEngineException(null, getRequest.type(), getRequest.id(), getResponse.getVersion(), getVersion)); - return; - } - BytesReference fetchedSource = getResponse.getSourceAsBytesRef(); - TransportPercolateAction.super.doExecute(new PercolateRequest(originalRequest, fetchedSource), listener); - } - - @Override - public void onFailure(Throwable e) { - listener.onFailure(e); - } - }); - } - - Tuple createGetRequest(XContentParser parser, String index, String type) throws IOException { - String getCurrentField = null; - String getIndex = index; - String getType = type; - String getId = null; - Long getVersion = null; - String getRouting = null; - String getPreference = "_local"; - - XContentParser.Token token; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - getCurrentField = parser.currentName(); - } else if (token.isValue()) { - if ("index".equals(getCurrentField)) { - getIndex = parser.text(); - } else if ("type".equals(getCurrentField)) { - getType = parser.text(); - } else if ("id".equals(getCurrentField)) { - getId = parser.text(); - } else if ("version".equals(getCurrentField)) { - getVersion = parser.longValue(); - } else if ("routing".equals(getCurrentField)) { - getRouting = parser.text(); - } else if ("preference".equals(getCurrentField)) { - getPreference = parser.text(); - } - } - } - return new Tuple( - // We are on the network thread, so operationThreaded should be true - new GetRequest(getIndex).preference(getPreference).operationThreaded(true).type(getType).id(getId).routing(getRouting), - getVersion - ); - } - @Override protected String executor() { return ThreadPool.Names.PERCOLATE; diff --git a/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java index eb7222b38db..f33581b7ebb 100644 --- a/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java @@ -98,11 +98,11 @@ public class PercolatorService extends AbstractComponent { ParsedDocument parsedDocument; Query query; - if (request.fetchedDoc() != null) { - parsedDocument = parseFetchedDoc(request.fetchedDoc(), percolateIndexService, request.documentType()); + if (request.docSource() != null && request.docSource().length() != 0) { + parsedDocument = parseFetchedDoc(request.docSource(), percolateIndexService, request.documentType()); query = parseQueryOrFilter(percolateIndexService, request.source()); } else { - Tuple parseResult = parsePercolate(percolateIndexService, indexShard, request.documentType(), request.source()); + Tuple parseResult = parsePercolate(percolateIndexService, request.documentType(), request.source()); parsedDocument = parseResult.v1(); query = parseResult.v2(); } @@ -177,7 +177,7 @@ public class PercolatorService extends AbstractComponent { } } - private Tuple parsePercolate(IndexService documentIndexService, IndexShard indexShard, String type, BytesReference source) throws ElasticSearchException { + private Tuple parsePercolate(IndexService documentIndexService, String type, BytesReference source) throws ElasticSearchException { Query query = null; ParsedDocument doc = null; XContentParser parser = null; @@ -255,6 +255,10 @@ public class PercolatorService extends AbstractComponent { } private Query parseQueryOrFilter(IndexService documentIndexService, BytesReference source) { + if (source == null || source.length() == 0) { + return null; + } + Query query = null; XContentParser parser = null; try { diff --git a/src/main/java/org/elasticsearch/rest/action/percolate/RestPercolateAction.java b/src/main/java/org/elasticsearch/rest/action/percolate/RestPercolateAction.java index 6e58238d75f..5f6cfe2958d 100644 --- a/src/main/java/org/elasticsearch/rest/action/percolate/RestPercolateAction.java +++ b/src/main/java/org/elasticsearch/rest/action/percolate/RestPercolateAction.java @@ -21,16 +21,18 @@ package org.elasticsearch.rest.action.percolate; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.percolate.PercolateRequest; import org.elasticsearch.action.percolate.PercolateResponse; -import org.elasticsearch.action.percolate.PercolateSourceBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestXContentBuilder; import java.io.IOException; @@ -49,20 +51,32 @@ public class RestPercolateAction extends BaseRestHandler { super(settings, client); controller.registerHandler(GET, "/{index}/{type}/_percolate", this); controller.registerHandler(POST, "/{index}/{type}/_percolate", this); - controller.registerHandler(GET, "/{index}/{type}/{id}/_percolate", new RestPercolatingExistingDocumentAction()); + controller.registerHandler(GET, "/{index}/{type}/{id}/_percolate", this); } @Override - public void handleRequest(RestRequest restRequest, RestChannel restChannel) { - PercolateRequest percolateRequest = new PercolateRequest(restRequest.param("index"), restRequest.param("type")); + public void handleRequest(final RestRequest restRequest, final RestChannel restChannel) { + String index = restRequest.param("index"); + String type = restRequest.param("type"); + + PercolateRequest percolateRequest = new PercolateRequest(index, type); percolateRequest.routing(restRequest.param("routing")); percolateRequest.preference(restRequest.param("preference")); percolateRequest.source(restRequest.content(), restRequest.contentUnsafe()); - executePercolateRequest(percolateRequest, restRequest, restChannel); - } + percolateRequest.routing(restRequest.param("routing")); + percolateRequest.preference(restRequest.param("preference")); + + GetRequest getRequest = new GetRequest(restRequest.param("get_index", index), restRequest.param("get_type", type), + restRequest.param("id")); + getRequest.routing(restRequest.param("get_routing")); + getRequest.preference(restRequest.param("get_preference")); + getRequest.refresh(restRequest.paramAsBoolean("refresh", getRequest.refresh())); + getRequest.realtime(restRequest.paramAsBooleanOptional("realtime", null)); + getRequest.version(RestActions.parseVersion(restRequest)); + getRequest.versionType(VersionType.fromString(restRequest.param("version_type"), getRequest.versionType())); + percolateRequest.getRequest(getRequest); - void executePercolateRequest(PercolateRequest percolateRequest, final RestRequest restRequest, final RestChannel restChannel) { // we just send a response, no need to fork percolateRequest.listenerThreaded(false); client.percolate(percolateRequest, new ActionListener() { @@ -116,29 +130,6 @@ public class RestPercolateAction extends BaseRestHandler { }); } - class RestPercolatingExistingDocumentAction implements RestHandler { - - @Override - public void handleRequest(RestRequest restRequest, RestChannel restChannel) { - String index = restRequest.param("index"); - String type = restRequest.param("type"); - PercolateRequest percolateRequest = new PercolateRequest(index, type); - percolateRequest.routing(restRequest.param("routing")); - percolateRequest.preference(restRequest.param("preference")); - - PercolateSourceBuilder builder = new PercolateSourceBuilder(); - builder.percolateGet().setIndex(restRequest.param("get_index", index)) - .setType(restRequest.param("get_type", type)) - .setId(restRequest.param("id")) - .setRouting(restRequest.param("get_routing")) - .setPreference(restRequest.param("get_preference")); - percolateRequest.source(builder); - - executePercolateRequest(percolateRequest, restRequest, restChannel); - } - - } - static final class Fields { static final XContentBuilderString _SHARDS = new XContentBuilderString("_shards"); static final XContentBuilderString TOTAL = new XContentBuilderString("total"); diff --git a/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java b/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java index d0492376b34..adcfd95696c 100644 --- a/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java +++ b/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java @@ -25,12 +25,14 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Requests; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings.Builder; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.query.FilterBuilders; import org.elasticsearch.index.query.QueryBuilders; @@ -38,7 +40,6 @@ import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.junit.Test; import static org.elasticsearch.action.percolate.PercolateSourceBuilder.docBuilder; -import static org.elasticsearch.action.percolate.PercolateSourceBuilder.getBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.*; import static org.elasticsearch.index.query.QueryBuilders.*; @@ -113,6 +114,15 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest { assertThat(searchResponse.getHits().totalHits(), equalTo(1L)); assertThat(searchResponse.getHits().getAt(0).type(), equalTo("type")); assertThat(searchResponse.getHits().getAt(0).id(), equalTo("1")); + + logger.info("--> Percolate non existing doc"); + try { + client().preparePercolate("test", "type") + .setGetRequest(Requests.getRequest("test").type("type").id("5")) + .execute().actionGet(); + fail("Exception should have been thrown"); + } catch (DocumentMissingException e) { + } } @Test @@ -601,29 +611,37 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest { logger.info("--> Percolate existing doc with id 1"); PercolateResponse response = client().preparePercolate("test", "type") - .setPercolateGet(getBuilder("test", "type", "1")) + .setGetRequest(Requests.getRequest("test").type("type").id("1")) .execute().actionGet(); + assertThat(response.getFailedShards(), equalTo(0)); + assertThat(response.getSuccessfulShards(), equalTo(5)); assertThat(response.getMatches(), arrayWithSize(2)); assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "4")); logger.info("--> Percolate existing doc with id 2"); response = client().preparePercolate("test", "type") - .setPercolateGet(getBuilder("test", "type", "2")) + .setGetRequest(Requests.getRequest("test").type("type").id("2")) .execute().actionGet(); + assertThat(response.getFailedShards(), equalTo(0)); + assertThat(response.getSuccessfulShards(), equalTo(5)); assertThat(response.getMatches(), arrayWithSize(2)); assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4")); logger.info("--> Percolate existing doc with id 3"); response = client().preparePercolate("test", "type") - .setPercolateGet(getBuilder("test", "type", "3")) + .setGetRequest(Requests.getRequest("test").type("type").id("3")) .execute().actionGet(); + assertThat(response.getFailedShards(), equalTo(0)); + assertThat(response.getSuccessfulShards(), equalTo(5)); assertThat(response.getMatches(), arrayWithSize(4)); assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "2", "3", "4")); logger.info("--> Percolate existing doc with id 4"); response = client().preparePercolate("test", "type") - .setPercolateGet(getBuilder("test", "type", "4")) + .setGetRequest(Requests.getRequest("test").type("type").id("4")) .execute().actionGet(); + assertThat(response.getFailedShards(), equalTo(0)); + assertThat(response.getSuccessfulShards(), equalTo(5)); assertThat(response.getMatches(), arrayWithSize(1)); assertThat(convertFromTextArray(response.getMatches()), arrayContaining("4")); @@ -667,29 +685,37 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest { logger.info("--> Percolate existing doc with id 1"); PercolateResponse response = client().preparePercolate("test", "type") - .setPercolateGet(getBuilder("test", "type", "1").setRouting("4")) + .setGetRequest(Requests.getRequest("test").type("type").id("1").routing("4")) .execute().actionGet(); + assertThat(response.getFailedShards(), equalTo(0)); + assertThat(response.getSuccessfulShards(), equalTo(5)); assertThat(response.getMatches(), arrayWithSize(2)); assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "4")); logger.info("--> Percolate existing doc with id 2"); response = client().preparePercolate("test", "type") - .setPercolateGet(getBuilder("test", "type", "2").setRouting("3")) + .setGetRequest(Requests.getRequest("test").type("type").id("2").routing("3")) .execute().actionGet(); + assertThat(response.getFailedShards(), equalTo(0)); + assertThat(response.getSuccessfulShards(), equalTo(5)); assertThat(response.getMatches(), arrayWithSize(2)); assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4")); logger.info("--> Percolate existing doc with id 3"); response = client().preparePercolate("test", "type") - .setPercolateGet(getBuilder("test", "type", "3").setRouting("2")) + .setGetRequest(Requests.getRequest("test").type("type").id("3").routing("2")) .execute().actionGet(); + assertThat(response.getFailedShards(), equalTo(0)); + assertThat(response.getSuccessfulShards(), equalTo(5)); assertThat(response.getMatches(), arrayWithSize(4)); assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("1", "2", "3", "4")); logger.info("--> Percolate existing doc with id 4"); response = client().preparePercolate("test", "type") - .setPercolateGet(getBuilder("test", "type", "4").setRouting("1")) + .setGetRequest(Requests.getRequest("test").type("type").id("4").routing("1")) .execute().actionGet(); + assertThat(response.getFailedShards(), equalTo(0)); + assertThat(response.getSuccessfulShards(), equalTo(5)); assertThat(response.getMatches(), arrayWithSize(1)); assertThat(convertFromTextArray(response.getMatches()), arrayContaining("4")); } @@ -725,17 +751,19 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest { logger.info("--> Percolate existing doc with id 2 and version 1"); PercolateResponse response = client().preparePercolate("test", "type") - .setPercolateGet(getBuilder("test", "type", "2").setVersion(1l)) + .setGetRequest(Requests.getRequest("test").type("type").id("2").version(1l)) .execute().actionGet(); + assertThat(response.getFailedShards(), equalTo(0)); + assertThat(response.getSuccessfulShards(), equalTo(5)); assertThat(response.getMatches(), arrayWithSize(2)); assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4")); logger.info("--> Percolate existing doc with id 2 and version 2"); try { - response = client().preparePercolate("test", "type") - .setPercolateGet(getBuilder("test", "type", "2").setVersion(2l)) + client().preparePercolate("test", "type") + .setGetRequest(Requests.getRequest("test").type("type").id("2").version(2l)) .execute().actionGet(); - fail("Error should have been throwed"); + fail("Error should have been thrown"); } catch (VersionConflictEngineException e) { } @@ -744,8 +772,10 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest { logger.info("--> Percolate existing doc with id 2 and version 2"); response = client().preparePercolate("test", "type") - .setPercolateGet(getBuilder("test", "type", "2").setVersion(2l)) + .setGetRequest(Requests.getRequest("test").type("type").id("2").version(2l)) .execute().actionGet(); + assertThat(response.getFailedShards(), equalTo(0)); + assertThat(response.getSuccessfulShards(), equalTo(5)); assertThat(response.getMatches(), arrayWithSize(2)); assertThat(convertFromTextArray(response.getMatches()), arrayContainingInAnyOrder("2", "4")); }