diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 42be0e4bed7..4261c5008e7 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -276,7 +276,6 @@ public class BulkRequest extends ActionRequest { String opType = null; long version = Versions.MATCH_ANY; VersionType versionType = VersionType.INTERNAL; - String percolate = null; int retryOnConflict = 0; // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) diff --git a/src/main/java/org/elasticsearch/action/get/GetRequest.java b/src/main/java/org/elasticsearch/action/get/GetRequest.java index 72e05d30514..13ccf8d6e9c 100644 --- a/src/main/java/org/elasticsearch/action/get/GetRequest.java +++ b/src/main/java/org/elasticsearch/action/get/GetRequest.java @@ -26,6 +26,8 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Required; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.index.VersionType; import java.io.IOException; @@ -53,6 +55,9 @@ public class GetRequest extends SingleShardOperationRequest { Boolean realtime; + private VersionType versionType = VersionType.INTERNAL; + private long version = Versions.MATCH_ANY; + GetRequest() { type = "_all"; } @@ -197,6 +202,31 @@ public class GetRequest extends SingleShardOperationRequest { return this; } + /** + * Sets the version, which will cause the get operation to only be performed if a matching + * version exists and no changes happened on the doc since then. + */ + public long version() { + return version; + } + + public GetRequest version(long version) { + this.version = version; + return this; + } + + /** + * Sets the versioning type. Defaults to {@link org.elasticsearch.index.VersionType#INTERNAL}. + */ + public GetRequest versionType(VersionType versionType) { + this.versionType = versionType; + return this; + } + + public VersionType versionType() { + return this.versionType; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -218,6 +248,8 @@ public class GetRequest extends SingleShardOperationRequest { } else if (realtime == 1) { this.realtime = true; } + this.versionType = VersionType.fromValue(in.readByte()); + this.version = in.readVLong(); } @Override @@ -244,6 +276,9 @@ public class GetRequest extends SingleShardOperationRequest { } else { out.writeByte((byte) 1); } + + out.writeByte(versionType.getValue()); + out.writeVLong(version); } @Override diff --git a/src/main/java/org/elasticsearch/action/get/GetRequestBuilder.java b/src/main/java/org/elasticsearch/action/get/GetRequestBuilder.java index 39993db9443..06bcb1b96ad 100644 --- a/src/main/java/org/elasticsearch/action/get/GetRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/get/GetRequestBuilder.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.support.single.shard.SingleShardOperationRequest import org.elasticsearch.client.Client; import org.elasticsearch.client.internal.InternalClient; import org.elasticsearch.common.Nullable; +import org.elasticsearch.index.VersionType; /** * A get document action request builder. @@ -107,6 +108,23 @@ public class GetRequestBuilder extends SingleShardOperationRequestBuilder listener) { ((Client) client).get(request, listener); diff --git a/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java b/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java index abbf7e8d00b..07707d733f3 100644 --- a/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java +++ b/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java @@ -29,8 +29,10 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.VersionType; import java.io.IOException; import java.util.ArrayList; @@ -47,6 +49,8 @@ public class MultiGetRequest extends ActionRequest { private String id; private String routing; private String[] fields; + private long version = Versions.MATCH_ANY; + private VersionType versionType = VersionType.INTERNAL; Item() { @@ -110,6 +114,24 @@ public class MultiGetRequest extends ActionRequest { return this.fields; } + public long version() { + return version; + } + + public Item version(long version) { + this.version = version; + return this; + } + + public VersionType versionType() { + return versionType; + } + + public Item versionType(VersionType versionType) { + this.versionType = versionType; + return this; + } + public static Item readItem(StreamInput in) throws IOException { Item item = new Item(); item.readFrom(in); @@ -129,6 +151,8 @@ public class MultiGetRequest extends ActionRequest { fields[i] = in.readString(); } } + version = in.readVLong(); + versionType = VersionType.fromValue(in.readByte()); } @Override @@ -145,6 +169,8 @@ public class MultiGetRequest extends ActionRequest { out.writeString(field); } } + out.writeVLong(version); + out.writeByte(versionType.getValue()); } } @@ -241,6 +267,9 @@ public class MultiGetRequest extends ActionRequest { String routing = null; String parent = null; List fields = null; + long version = Versions.MATCH_ANY; + VersionType versionType = VersionType.INTERNAL; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); @@ -258,6 +287,10 @@ public class MultiGetRequest extends ActionRequest { } else if ("fields".equals(currentFieldName)) { fields = new ArrayList(); fields.add(parser.text()); + } else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) { + version = parser.longValue(); + } else if ("_version_type".equals(currentFieldName) || "_versionType".equals(currentFieldName) || "version_type".equals(currentFieldName) || "versionType".equals(currentFieldName)) { + versionType = VersionType.fromString(parser.text()); } } else if (token == XContentParser.Token.START_ARRAY) { if ("fields".equals(currentFieldName)) { @@ -274,7 +307,7 @@ public class MultiGetRequest extends ActionRequest { } else { aFields = defaultFields; } - add(new Item(index, type, id).routing(routing).fields(aFields).parent(parent)); + add(new Item(index, type, id).routing(routing).fields(aFields).parent(parent).version(version).versionType(versionType)); } } else if ("ids".equals(currentFieldName)) { while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { diff --git a/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java b/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java index 72a2e2a7fa1..fc570afe97f 100644 --- a/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java +++ b/src/main/java/org/elasticsearch/action/get/MultiGetShardRequest.java @@ -20,10 +20,12 @@ package org.elasticsearch.action.get; import gnu.trove.list.array.TIntArrayList; +import gnu.trove.list.array.TLongArrayList; import org.elasticsearch.action.support.single.shard.SingleShardOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.VersionType; import java.io.IOException; import java.util.ArrayList; @@ -40,6 +42,8 @@ public class MultiGetShardRequest extends SingleShardOperationRequest types; List ids; List fields; + TLongArrayList versions; + List versionTypes; MultiGetShardRequest() { @@ -52,6 +56,8 @@ public class MultiGetShardRequest extends SingleShardOperationRequest(); ids = new ArrayList(); fields = new ArrayList(); + versions = new TLongArrayList(); + versionTypes = new ArrayList(); } public int shardId() { @@ -90,11 +96,13 @@ public class MultiGetShardRequest extends SingleShardOperationRequest(size); ids = new ArrayList(size); fields = new ArrayList(size); + versions = new TLongArrayList(size); + versionTypes = new ArrayList(size); for (int i = 0; i < size; i++) { locations.add(in.readVInt()); if (in.readBoolean()) { @@ -123,6 +133,8 @@ public class MultiGetShardRequest extends SingleShardOperationRequest 0; Engine.GetResult get = null; if (type == null || type.equals("_all")) { for (String typeX : mapperService.types()) { - get = indexShard.get(new Engine.Get(realtime, new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(typeX, id))).loadSource(loadSource)); + get = indexShard.get(new Engine.Get(realtime, new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(typeX, id))) + .loadSource(loadSource).version(version).versionType(versionType)); if (get.exists()) { type = typeX; break; @@ -164,7 +166,8 @@ public class ShardGetService extends AbstractIndexShardComponent { return new GetResult(shardId.index().name(), type, id, -1, false, null, null); } } else { - get = indexShard.get(new Engine.Get(realtime, new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(type, id))).loadSource(loadSource)); + get = indexShard.get(new Engine.Get(realtime, new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(type, id))) + .loadSource(loadSource).version(version).versionType(versionType)); if (!get.exists()) { get.release(); return new GetResult(shardId.index().name(), type, id, -1, false, null, null); diff --git a/src/main/java/org/elasticsearch/rest/action/get/RestGetAction.java b/src/main/java/org/elasticsearch/rest/action/get/RestGetAction.java index 7e23f2ae614..5d744d6391e 100644 --- a/src/main/java/org/elasticsearch/rest/action/get/RestGetAction.java +++ b/src/main/java/org/elasticsearch/rest/action/get/RestGetAction.java @@ -27,7 +27,9 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestActions; import java.io.IOException; @@ -66,6 +68,9 @@ public class RestGetAction extends BaseRestHandler { } } + getRequest.version(RestActions.parseVersion(request)); + getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType())); + client.get(getRequest, new ActionListener() { @Override diff --git a/src/test/java/org/elasticsearch/test/integration/get/GetActionTests.java b/src/test/java/org/elasticsearch/test/integration/get/GetActionTests.java index bb24759847b..841a214e41f 100644 --- a/src/test/java/org/elasticsearch/test/integration/get/GetActionTests.java +++ b/src/test/java/org/elasticsearch/test/integration/get/GetActionTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.junit.Test; @@ -496,4 +497,207 @@ public class GetActionTests extends AbstractSharedClusterTest { assertThat(responseBeforeFlush.getSourceAsString(), is(responseAfterFlush.getSourceAsString())); } + @Test + public void testGetWithVersion() { + client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.refresh_interval", -1)).execute().actionGet(); + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + + GetResponse response = client().prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(response.isExists(), equalTo(false)); + + logger.info("--> index doc 1"); + client().prepareIndex("test", "type1", "1").setSource("field1", "value1", "field2", "value2").execute().actionGet(); + + // From translog: + + // version 0 means ignore version, which is the default + response = client().prepareGet("test", "type1", "1").setVersion(0).execute().actionGet(); + assertThat(response.isExists(), equalTo(true)); + assertThat(response.getId(), equalTo("1")); + assertThat(response.getVersion(), equalTo(1l)); + + response = client().prepareGet("test", "type1", "1").setVersion(1).execute().actionGet(); + assertThat(response.isExists(), equalTo(true)); + assertThat(response.getId(), equalTo("1")); + assertThat(response.getVersion(), equalTo(1l)); + + try { + client().prepareGet("test", "type1", "1").setVersion(2).execute().actionGet(); + assert false; + } catch (VersionConflictEngineException e) {} + + // From Lucene index: + client().admin().indices().prepareRefresh("test").execute().actionGet(); + + // version 0 means ignore version, which is the default + response = client().prepareGet("test", "type1", "1").setVersion(0).setRealtime(false).execute().actionGet(); + assertThat(response.isExists(), equalTo(true)); + assertThat(response.getId(), equalTo("1")); + assertThat(response.getVersion(), equalTo(1l)); + + response = client().prepareGet("test", "type1", "1").setVersion(1).setRealtime(false).execute().actionGet(); + assertThat(response.isExists(), equalTo(true)); + assertThat(response.getId(), equalTo("1")); + assertThat(response.getVersion(), equalTo(1l)); + + try { + client().prepareGet("test", "type1", "1").setVersion(2).setRealtime(false).execute().actionGet(); + assert false; + } catch (VersionConflictEngineException e) {} + + logger.info("--> index doc 1 again, so increasing the version"); + client().prepareIndex("test", "type1", "1").setSource("field1", "value1", "field2", "value2").execute().actionGet(); + + // From translog: + + // version 0 means ignore version, which is the default + response = client().prepareGet("test", "type1", "1").setVersion(0).execute().actionGet(); + assertThat(response.isExists(), equalTo(true)); + assertThat(response.getId(), equalTo("1")); + assertThat(response.getVersion(), equalTo(2l)); + + try { + client().prepareGet("test", "type1", "1").setVersion(1).execute().actionGet(); + assert false; + } catch (VersionConflictEngineException e) {} + + response = client().prepareGet("test", "type1", "1").setVersion(2).execute().actionGet(); + assertThat(response.isExists(), equalTo(true)); + assertThat(response.getId(), equalTo("1")); + assertThat(response.getVersion(), equalTo(2l)); + + // From Lucene index: + client().admin().indices().prepareRefresh("test").execute().actionGet(); + + // version 0 means ignore version, which is the default + response = client().prepareGet("test", "type1", "1").setVersion(0).setRealtime(false).execute().actionGet(); + assertThat(response.isExists(), equalTo(true)); + assertThat(response.getId(), equalTo("1")); + assertThat(response.getVersion(), equalTo(2l)); + + try { + client().prepareGet("test", "type1", "1").setVersion(1).setRealtime(false).execute().actionGet(); + assert false; + } catch (VersionConflictEngineException e) {} + + response = client().prepareGet("test", "type1", "1").setVersion(2).setRealtime(false).execute().actionGet(); + assertThat(response.isExists(), equalTo(true)); + assertThat(response.getId(), equalTo("1")); + assertThat(response.getVersion(), equalTo(2l)); + } + + @Test + public void testMultiGetWithVersion() throws Exception { + try { + client().admin().indices().prepareDelete("test").execute().actionGet(); + } catch (Exception e) { + // fine + } + client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.refresh_interval", -1)).execute().actionGet(); + + ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + + MultiGetResponse response = client().prepareMultiGet().add("test", "type1", "1").execute().actionGet(); + assertThat(response.getResponses().length, equalTo(1)); + assertThat(response.getResponses()[0].getResponse().isExists(), equalTo(false)); + + for (int i = 0; i < 3; i++) { + client().prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + + // Version from translog + response = client().prepareMultiGet() + .add(new MultiGetRequest.Item("test", "type1", "1").version(0)) + .add(new MultiGetRequest.Item("test", "type1", "1").version(1)) + .add(new MultiGetRequest.Item("test", "type1", "1").version(2)) + .execute().actionGet(); + assertThat(response.getResponses().length, equalTo(3)); + // [0] version doesn't matter, which is the default + assertThat(response.getResponses()[0].getFailure(), nullValue()); + assertThat(response.getResponses()[0].getId(), equalTo("1")); + assertThat(response.getResponses()[0].getResponse().isExists(), equalTo(true)); + assertThat(response.getResponses()[0].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1")); + assertThat(response.getResponses()[1].getId(), equalTo("1")); + assertThat(response.getResponses()[1].getFailure(), nullValue()); + assertThat(response.getResponses()[1].getResponse().isExists(), equalTo(true)); + assertThat(response.getResponses()[1].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1")); + assertThat(response.getResponses()[2].getFailure(), notNullValue()); + assertThat(response.getResponses()[2].getFailure().getId(), equalTo("1")); + assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("VersionConflictEngineException")); + + //Version from Lucene index + client().admin().indices().prepareRefresh("test").execute().actionGet(); + response = client().prepareMultiGet() + .add(new MultiGetRequest.Item("test", "type1", "1").version(0)) + .add(new MultiGetRequest.Item("test", "type1", "1").version(1)) + .add(new MultiGetRequest.Item("test", "type1", "1").version(2)) + .setRealtime(false) + .execute().actionGet(); + assertThat(response.getResponses().length, equalTo(3)); + // [0] version doesn't matter, which is the default + assertThat(response.getResponses()[0].getFailure(), nullValue()); + assertThat(response.getResponses()[0].getId(), equalTo("1")); + assertThat(response.getResponses()[0].getResponse().isExists(), equalTo(true)); + assertThat(response.getResponses()[0].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1")); + assertThat(response.getResponses()[1].getId(), equalTo("1")); + assertThat(response.getResponses()[1].getFailure(), nullValue()); + assertThat(response.getResponses()[1].getResponse().isExists(), equalTo(true)); + assertThat(response.getResponses()[1].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1")); + assertThat(response.getResponses()[2].getFailure(), notNullValue()); + assertThat(response.getResponses()[2].getFailure().getId(), equalTo("1")); + assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("VersionConflictEngineException")); + + + for (int i = 0; i < 3; i++) { + client().prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + + // Version from translog + response = client().prepareMultiGet() + .add(new MultiGetRequest.Item("test", "type1", "2").version(0)) + .add(new MultiGetRequest.Item("test", "type1", "2").version(1)) + .add(new MultiGetRequest.Item("test", "type1", "2").version(2)) + .execute().actionGet(); + assertThat(response.getResponses().length, equalTo(3)); + // [0] version doesn't matter, which is the default + assertThat(response.getResponses()[0].getFailure(), nullValue()); + assertThat(response.getResponses()[0].getId(), equalTo("2")); + assertThat(response.getResponses()[0].getResponse().isExists(), equalTo(true)); + assertThat(response.getResponses()[0].getResponse().getSourceAsMap().get("field").toString(), equalTo("value2")); + assertThat(response.getResponses()[1].getFailure(), notNullValue()); + assertThat(response.getResponses()[1].getFailure().getId(), equalTo("2")); + assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("VersionConflictEngineException")); + assertThat(response.getResponses()[2].getId(), equalTo("2")); + assertThat(response.getResponses()[2].getFailure(), nullValue()); + assertThat(response.getResponses()[2].getResponse().isExists(), equalTo(true)); + assertThat(response.getResponses()[2].getResponse().getSourceAsMap().get("field").toString(), equalTo("value2")); + + + //Version from Lucene index + client().admin().indices().prepareRefresh("test").execute().actionGet(); + response = client().prepareMultiGet() + .add(new MultiGetRequest.Item("test", "type1", "2").version(0)) + .add(new MultiGetRequest.Item("test", "type1", "2").version(1)) + .add(new MultiGetRequest.Item("test", "type1", "2").version(2)) + .setRealtime(false) + .execute().actionGet(); + assertThat(response.getResponses().length, equalTo(3)); + // [0] version doesn't matter, which is the default + assertThat(response.getResponses()[0].getFailure(), nullValue()); + assertThat(response.getResponses()[0].getId(), equalTo("2")); + assertThat(response.getResponses()[0].getResponse().isExists(), equalTo(true)); + assertThat(response.getResponses()[0].getResponse().getSourceAsMap().get("field").toString(), equalTo("value2")); + assertThat(response.getResponses()[1].getFailure(), notNullValue()); + assertThat(response.getResponses()[1].getFailure().getId(), equalTo("2")); + assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("VersionConflictEngineException")); + assertThat(response.getResponses()[2].getId(), equalTo("2")); + assertThat(response.getResponses()[2].getFailure(), nullValue()); + assertThat(response.getResponses()[2].getResponse().isExists(), equalTo(true)); + assertThat(response.getResponses()[2].getResponse().getSourceAsMap().get("field").toString(), equalTo("value2")); + } + }