Added version support to mget and get apis, that only will perform the get operation if the version of the document to be fetched matches with the provided version.

Both get and mget apis now support the following additional parameters: `version` and `version_type`.

Closes #3404
This commit is contained in:
Martijn van Groningen 2013-07-29 15:52:52 +02:00
parent 3b2a9fc86b
commit a9dd3c9756
14 changed files with 377 additions and 25 deletions

View File

@ -276,7 +276,6 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
String percolate = null;
int retryOnConflict = 0;
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)

View File

@ -26,6 +26,8 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Required;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;
import java.io.IOException;
@ -53,6 +55,9 @@ public class GetRequest extends SingleShardOperationRequest<GetRequest> {
Boolean realtime;
private VersionType versionType = VersionType.INTERNAL;
private long version = Versions.MATCH_ANY;
GetRequest() {
type = "_all";
}
@ -197,6 +202,31 @@ public class GetRequest extends SingleShardOperationRequest<GetRequest> {
return this;
}
/**
* Sets the version, which will cause the get operation to only be performed if a matching
* version exists and no changes happened on the doc since then.
*/
public long version() {
return version;
}
public GetRequest version(long version) {
this.version = version;
return this;
}
/**
* Sets the versioning type. Defaults to {@link org.elasticsearch.index.VersionType#INTERNAL}.
*/
public GetRequest versionType(VersionType versionType) {
this.versionType = versionType;
return this;
}
public VersionType versionType() {
return this.versionType;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -218,6 +248,8 @@ public class GetRequest extends SingleShardOperationRequest<GetRequest> {
} else if (realtime == 1) {
this.realtime = true;
}
this.versionType = VersionType.fromValue(in.readByte());
this.version = in.readVLong();
}
@Override
@ -244,6 +276,9 @@ public class GetRequest extends SingleShardOperationRequest<GetRequest> {
} else {
out.writeByte((byte) 1);
}
out.writeByte(versionType.getValue());
out.writeVLong(version);
}
@Override

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.support.single.shard.SingleShardOperationRequest
import org.elasticsearch.client.Client;
import org.elasticsearch.client.internal.InternalClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.VersionType;
/**
* A get document action request builder.
@ -107,6 +108,23 @@ public class GetRequestBuilder extends SingleShardOperationRequestBuilder<GetReq
return this;
}
/**
* Sets the version, which will cause the get operation to only be performed if a matching
* version exists and no changes happened on the doc since then.
*/
public GetRequestBuilder setVersion(long version) {
request.version(version);
return this;
}
/**
* Sets the versioning type. Defaults to {@link org.elasticsearch.index.VersionType#INTERNAL}.
*/
public GetRequestBuilder setVersionType(VersionType versionType) {
request.versionType(versionType);
return this;
}
@Override
protected void doExecute(ActionListener<GetResponse> listener) {
((Client) client).get(request, listener);

View File

@ -29,8 +29,10 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.VersionType;
import java.io.IOException;
import java.util.ArrayList;
@ -47,6 +49,8 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> {
private String id;
private String routing;
private String[] fields;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
Item() {
@ -110,6 +114,24 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> {
return this.fields;
}
public long version() {
return version;
}
public Item version(long version) {
this.version = version;
return this;
}
public VersionType versionType() {
return versionType;
}
public Item versionType(VersionType versionType) {
this.versionType = versionType;
return this;
}
public static Item readItem(StreamInput in) throws IOException {
Item item = new Item();
item.readFrom(in);
@ -129,6 +151,8 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> {
fields[i] = in.readString();
}
}
version = in.readVLong();
versionType = VersionType.fromValue(in.readByte());
}
@Override
@ -145,6 +169,8 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> {
out.writeString(field);
}
}
out.writeVLong(version);
out.writeByte(versionType.getValue());
}
}
@ -241,6 +267,9 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> {
String routing = null;
String parent = null;
List<String> fields = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
@ -258,6 +287,10 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> {
} else if ("fields".equals(currentFieldName)) {
fields = new ArrayList<String>();
fields.add(parser.text());
} else if ("_version".equals(currentFieldName) || "version".equals(currentFieldName)) {
version = parser.longValue();
} else if ("_version_type".equals(currentFieldName) || "_versionType".equals(currentFieldName) || "version_type".equals(currentFieldName) || "versionType".equals(currentFieldName)) {
versionType = VersionType.fromString(parser.text());
}
} else if (token == XContentParser.Token.START_ARRAY) {
if ("fields".equals(currentFieldName)) {
@ -274,7 +307,7 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> {
} else {
aFields = defaultFields;
}
add(new Item(index, type, id).routing(routing).fields(aFields).parent(parent));
add(new Item(index, type, id).routing(routing).fields(aFields).parent(parent).version(version).versionType(versionType));
}
} else if ("ids".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {

View File

@ -20,10 +20,12 @@
package org.elasticsearch.action.get;
import gnu.trove.list.array.TIntArrayList;
import gnu.trove.list.array.TLongArrayList;
import org.elasticsearch.action.support.single.shard.SingleShardOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.VersionType;
import java.io.IOException;
import java.util.ArrayList;
@ -40,6 +42,8 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
List<String> types;
List<String> ids;
List<String[]> fields;
TLongArrayList versions;
List<VersionType> versionTypes;
MultiGetShardRequest() {
@ -52,6 +56,8 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
types = new ArrayList<String>();
ids = new ArrayList<String>();
fields = new ArrayList<String[]>();
versions = new TLongArrayList();
versionTypes = new ArrayList<VersionType>();
}
public int shardId() {
@ -90,11 +96,13 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
return this;
}
public void add(int location, @Nullable String type, String id, String[] fields) {
public void add(int location, @Nullable String type, String id, String[] fields, long version, VersionType versionType) {
this.locations.add(location);
this.types.add(type);
this.ids.add(id);
this.fields.add(fields);
this.versions.add(version);
this.versionTypes.add(versionType);
}
@Override
@ -105,6 +113,8 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
types = new ArrayList<String>(size);
ids = new ArrayList<String>(size);
fields = new ArrayList<String[]>(size);
versions = new TLongArrayList(size);
versionTypes = new ArrayList<VersionType>(size);
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
if (in.readBoolean()) {
@ -123,6 +133,8 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
} else {
fields.add(null);
}
versions.add(in.readVLong());
versionTypes.add(VersionType.fromValue(in.readByte()));
}
preference = in.readOptionalString();
@ -156,6 +168,8 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
out.writeString(field);
}
}
out.writeVLong(versions.get(i));
out.writeByte(versionTypes.get(i).getValue());
}
out.writeOptionalString(preference);

View File

@ -99,7 +99,8 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
indexShard.refresh(new Engine.Refresh(false));
}
GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(), request.realtime());
GetResult result = indexShard.getService().get(request.type(), request.id(), request.fields(),
request.realtime(), request.version(), request.versionType());
return new GetResponse(result);
}

View File

@ -82,7 +82,7 @@ public class TransportMultiGetAction extends TransportAction<MultiGetRequest, Mu
shardRequests.put(shardId, shardRequest);
}
shardRequest.add(i, item.type(), item.id(), item.fields());
shardRequest.add(i, item.type(), item.id(), item.fields(), item.version(), item.versionType());
}
if (shardRequests.size() == 0) {

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.service.IndexService;
@ -111,9 +112,14 @@ public class TransportShardMultiGetAction extends TransportShardSingleOperationA
String type = request.types.get(i);
String id = request.ids.get(i);
String[] fields = request.fields.get(i);
long version = request.versions.get(i);
VersionType versionType = request.versionTypes.get(i);
if (versionType == null) {
versionType = VersionType.INTERNAL;
}
try {
GetResult getResult = indexShard.getService().get(type, id, fields, request.realtime());
GetResult getResult = indexShard.getService().get(type, id, fields, request.realtime(), version, versionType);
response.add(request.locations.get(i), new GetResponse(getResult));
} catch (Exception e) {
logger.debug("[{}][{}] failed to execute multi_get for [{}]/[{}]", e, request.index(), shardId, type, id);

View File

@ -17,7 +17,6 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.DocumentSourceMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
@ -65,7 +64,8 @@ public class UpdateHelper extends AbstractComponent {
public Result prepare(UpdateRequest request, IndexShard indexShard) {
long getDate = System.currentTimeMillis();
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME}, true);
new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME},
true, request.version(), request.versionType());
if (!getResult.isExists()) {
if (request.upsertRequest() == null && !request.docAsUpsert()) {
@ -86,11 +86,6 @@ public class UpdateHelper extends AbstractComponent {
return new Result(indexRequest, Operation.UPSERT, null, null);
}
if (request.versionType().isVersionConflict(getResult.getVersion(), request.version())) {
throw new VersionConflictEngineException(new ShardId(request.index(), request.shardId()), request.type(), request.id(),
getResult.getVersion(), request.version());
}
long updateVersion = getResult.getVersion();
if (request.versionType() == VersionType.EXTERNAL) {
updateVersion = request.version(); // remember, match_any is excluded by the conflict test

View File

@ -823,6 +823,8 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final boolean realtime;
private final Term uid;
private boolean loadSource = true;
private long version;
private VersionType versionType;
public Get(boolean realtime, Term uid) {
this.realtime = realtime;
@ -845,6 +847,24 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
this.loadSource = loadSource;
return this;
}
public long version() {
return version;
}
public Get version(long version) {
this.version = version;
return this;
}
public VersionType versionType() {
return versionType;
}
public Get versionType(VersionType versionType) {
this.versionType = versionType;
return this;
}
}
static class GetResult {

View File

@ -50,6 +50,7 @@ import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.policy.IndexUpgraderMergePolicy;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
@ -315,6 +316,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (versionValue.delete()) {
return GetResult.NOT_EXISTS;
}
if (get.version() != Versions.MATCH_ANY) {
if (get.versionType().isVersionConflict(versionValue.version(), get.version())) {
Uid uid = Uid.createUid(get.uid().text());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), versionValue.version(), get.version());
}
}
if (!get.loadSource()) {
return new GetResult(true, versionValue.version(), null);
}
@ -332,19 +339,31 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// no version, get the version from the index, we know that we refresh on flush
Searcher searcher = searcher();
final Versions.DocIdAndVersion docIdAndVersion;
try {
final Versions.DocIdAndVersion docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
if (docIdAndVersion != null) {
return new GetResult(searcher, docIdAndVersion);
}
// don't release the searcher on this path, it is the responsability of the caller to call GetResult.release
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
} catch (Exception e) {
searcher.release();
//TODO: A better exception goes here
throw new EngineException(shardId(), "Couldn't resolve version", e);
}
searcher.release();
return GetResult.NOT_EXISTS;
if (get.version() != Versions.MATCH_ANY && docIdAndVersion != null) {
if (get.versionType().isVersionConflict(docIdAndVersion.version, get.version())) {
searcher.release();
Uid uid = Uid.createUid(get.uid().text());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), docIdAndVersion.version, get.version());
}
}
if (docIdAndVersion != null) {
// don't release the searcher on this path, it is the responsability of the caller to call GetResult.release
return new GetResult(searcher, docIdAndVersion);
} else {
searcher.release();
return GetResult.NOT_EXISTS;
}
} finally {
rwl.readLock().unlock();
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fieldvisitor.CustomFieldsVisitor;
@ -94,11 +95,11 @@ public class ShardGetService extends AbstractIndexShardComponent {
return this;
}
public GetResult get(String type, String id, String[] gFields, boolean realtime) throws ElasticSearchException {
public GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType) throws ElasticSearchException {
currentMetric.inc();
try {
long now = System.nanoTime();
GetResult getResult = innerGet(type, id, gFields, realtime);
GetResult getResult = innerGet(type, id, gFields, realtime, version, versionType);
if (getResult.isExists()) {
existsMetric.inc(System.nanoTime() - now);
} else {
@ -143,12 +144,13 @@ public class ShardGetService extends AbstractIndexShardComponent {
}
}
public GetResult innerGet(String type, String id, String[] gFields, boolean realtime) throws ElasticSearchException {
public GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType) throws ElasticSearchException {
boolean loadSource = gFields == null || gFields.length > 0;
Engine.GetResult get = null;
if (type == null || type.equals("_all")) {
for (String typeX : mapperService.types()) {
get = indexShard.get(new Engine.Get(realtime, new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(typeX, id))).loadSource(loadSource));
get = indexShard.get(new Engine.Get(realtime, new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(typeX, id)))
.loadSource(loadSource).version(version).versionType(versionType));
if (get.exists()) {
type = typeX;
break;
@ -164,7 +166,8 @@ public class ShardGetService extends AbstractIndexShardComponent {
return new GetResult(shardId.index().name(), type, id, -1, false, null, null);
}
} else {
get = indexShard.get(new Engine.Get(realtime, new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(type, id))).loadSource(loadSource));
get = indexShard.get(new Engine.Get(realtime, new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(type, id)))
.loadSource(loadSource).version(version).versionType(versionType));
if (!get.exists()) {
get.release();
return new GetResult(shardId.index().name(), type, id, -1, false, null, null);

View File

@ -27,7 +27,9 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import java.io.IOException;
@ -66,6 +68,9 @@ public class RestGetAction extends BaseRestHandler {
}
}
getRequest.version(RestActions.parseVersion(request));
getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType()));
client.get(getRequest, new ActionListener<GetResponse>() {
@Override

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.junit.Test;
@ -496,4 +497,207 @@ public class GetActionTests extends AbstractSharedClusterTest {
assertThat(responseBeforeFlush.getSourceAsString(), is(responseAfterFlush.getSourceAsString()));
}
@Test
public void testGetWithVersion() {
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.refresh_interval", -1)).execute().actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
GetResponse response = client().prepareGet("test", "type1", "1").execute().actionGet();
assertThat(response.isExists(), equalTo(false));
logger.info("--> index doc 1");
client().prepareIndex("test", "type1", "1").setSource("field1", "value1", "field2", "value2").execute().actionGet();
// From translog:
// version 0 means ignore version, which is the default
response = client().prepareGet("test", "type1", "1").setVersion(0).execute().actionGet();
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getVersion(), equalTo(1l));
response = client().prepareGet("test", "type1", "1").setVersion(1).execute().actionGet();
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getVersion(), equalTo(1l));
try {
client().prepareGet("test", "type1", "1").setVersion(2).execute().actionGet();
assert false;
} catch (VersionConflictEngineException e) {}
// From Lucene index:
client().admin().indices().prepareRefresh("test").execute().actionGet();
// version 0 means ignore version, which is the default
response = client().prepareGet("test", "type1", "1").setVersion(0).setRealtime(false).execute().actionGet();
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getVersion(), equalTo(1l));
response = client().prepareGet("test", "type1", "1").setVersion(1).setRealtime(false).execute().actionGet();
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getVersion(), equalTo(1l));
try {
client().prepareGet("test", "type1", "1").setVersion(2).setRealtime(false).execute().actionGet();
assert false;
} catch (VersionConflictEngineException e) {}
logger.info("--> index doc 1 again, so increasing the version");
client().prepareIndex("test", "type1", "1").setSource("field1", "value1", "field2", "value2").execute().actionGet();
// From translog:
// version 0 means ignore version, which is the default
response = client().prepareGet("test", "type1", "1").setVersion(0).execute().actionGet();
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getVersion(), equalTo(2l));
try {
client().prepareGet("test", "type1", "1").setVersion(1).execute().actionGet();
assert false;
} catch (VersionConflictEngineException e) {}
response = client().prepareGet("test", "type1", "1").setVersion(2).execute().actionGet();
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getVersion(), equalTo(2l));
// From Lucene index:
client().admin().indices().prepareRefresh("test").execute().actionGet();
// version 0 means ignore version, which is the default
response = client().prepareGet("test", "type1", "1").setVersion(0).setRealtime(false).execute().actionGet();
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getVersion(), equalTo(2l));
try {
client().prepareGet("test", "type1", "1").setVersion(1).setRealtime(false).execute().actionGet();
assert false;
} catch (VersionConflictEngineException e) {}
response = client().prepareGet("test", "type1", "1").setVersion(2).setRealtime(false).execute().actionGet();
assertThat(response.isExists(), equalTo(true));
assertThat(response.getId(), equalTo("1"));
assertThat(response.getVersion(), equalTo(2l));
}
@Test
public void testMultiGetWithVersion() throws Exception {
try {
client().admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// fine
}
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.refresh_interval", -1)).execute().actionGet();
ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
MultiGetResponse response = client().prepareMultiGet().add("test", "type1", "1").execute().actionGet();
assertThat(response.getResponses().length, equalTo(1));
assertThat(response.getResponses()[0].getResponse().isExists(), equalTo(false));
for (int i = 0; i < 3; i++) {
client().prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
// Version from translog
response = client().prepareMultiGet()
.add(new MultiGetRequest.Item("test", "type1", "1").version(0))
.add(new MultiGetRequest.Item("test", "type1", "1").version(1))
.add(new MultiGetRequest.Item("test", "type1", "1").version(2))
.execute().actionGet();
assertThat(response.getResponses().length, equalTo(3));
// [0] version doesn't matter, which is the default
assertThat(response.getResponses()[0].getFailure(), nullValue());
assertThat(response.getResponses()[0].getId(), equalTo("1"));
assertThat(response.getResponses()[0].getResponse().isExists(), equalTo(true));
assertThat(response.getResponses()[0].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1"));
assertThat(response.getResponses()[1].getId(), equalTo("1"));
assertThat(response.getResponses()[1].getFailure(), nullValue());
assertThat(response.getResponses()[1].getResponse().isExists(), equalTo(true));
assertThat(response.getResponses()[1].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1"));
assertThat(response.getResponses()[2].getFailure(), notNullValue());
assertThat(response.getResponses()[2].getFailure().getId(), equalTo("1"));
assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("VersionConflictEngineException"));
//Version from Lucene index
client().admin().indices().prepareRefresh("test").execute().actionGet();
response = client().prepareMultiGet()
.add(new MultiGetRequest.Item("test", "type1", "1").version(0))
.add(new MultiGetRequest.Item("test", "type1", "1").version(1))
.add(new MultiGetRequest.Item("test", "type1", "1").version(2))
.setRealtime(false)
.execute().actionGet();
assertThat(response.getResponses().length, equalTo(3));
// [0] version doesn't matter, which is the default
assertThat(response.getResponses()[0].getFailure(), nullValue());
assertThat(response.getResponses()[0].getId(), equalTo("1"));
assertThat(response.getResponses()[0].getResponse().isExists(), equalTo(true));
assertThat(response.getResponses()[0].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1"));
assertThat(response.getResponses()[1].getId(), equalTo("1"));
assertThat(response.getResponses()[1].getFailure(), nullValue());
assertThat(response.getResponses()[1].getResponse().isExists(), equalTo(true));
assertThat(response.getResponses()[1].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1"));
assertThat(response.getResponses()[2].getFailure(), notNullValue());
assertThat(response.getResponses()[2].getFailure().getId(), equalTo("1"));
assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("VersionConflictEngineException"));
for (int i = 0; i < 3; i++) {
client().prepareIndex("test", "type1", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
// Version from translog
response = client().prepareMultiGet()
.add(new MultiGetRequest.Item("test", "type1", "2").version(0))
.add(new MultiGetRequest.Item("test", "type1", "2").version(1))
.add(new MultiGetRequest.Item("test", "type1", "2").version(2))
.execute().actionGet();
assertThat(response.getResponses().length, equalTo(3));
// [0] version doesn't matter, which is the default
assertThat(response.getResponses()[0].getFailure(), nullValue());
assertThat(response.getResponses()[0].getId(), equalTo("2"));
assertThat(response.getResponses()[0].getResponse().isExists(), equalTo(true));
assertThat(response.getResponses()[0].getResponse().getSourceAsMap().get("field").toString(), equalTo("value2"));
assertThat(response.getResponses()[1].getFailure(), notNullValue());
assertThat(response.getResponses()[1].getFailure().getId(), equalTo("2"));
assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("VersionConflictEngineException"));
assertThat(response.getResponses()[2].getId(), equalTo("2"));
assertThat(response.getResponses()[2].getFailure(), nullValue());
assertThat(response.getResponses()[2].getResponse().isExists(), equalTo(true));
assertThat(response.getResponses()[2].getResponse().getSourceAsMap().get("field").toString(), equalTo("value2"));
//Version from Lucene index
client().admin().indices().prepareRefresh("test").execute().actionGet();
response = client().prepareMultiGet()
.add(new MultiGetRequest.Item("test", "type1", "2").version(0))
.add(new MultiGetRequest.Item("test", "type1", "2").version(1))
.add(new MultiGetRequest.Item("test", "type1", "2").version(2))
.setRealtime(false)
.execute().actionGet();
assertThat(response.getResponses().length, equalTo(3));
// [0] version doesn't matter, which is the default
assertThat(response.getResponses()[0].getFailure(), nullValue());
assertThat(response.getResponses()[0].getId(), equalTo("2"));
assertThat(response.getResponses()[0].getResponse().isExists(), equalTo(true));
assertThat(response.getResponses()[0].getResponse().getSourceAsMap().get("field").toString(), equalTo("value2"));
assertThat(response.getResponses()[1].getFailure(), notNullValue());
assertThat(response.getResponses()[1].getFailure().getId(), equalTo("2"));
assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("VersionConflictEngineException"));
assertThat(response.getResponses()[2].getId(), equalTo("2"));
assertThat(response.getResponses()[2].getFailure(), nullValue());
assertThat(response.getResponses()[2].getResponse().isExists(), equalTo(true));
assertThat(response.getResponses()[2].getResponse().getSourceAsMap().get("field").toString(), equalTo("value2"));
}
}