Added version support to update requests

Moved version handling from RobinEngine into VersionType. This avoids code re-use and makes it cleaner and easier to read.

Closes #3111
This commit is contained in:
Boaz Leskes 2013-06-13 19:37:33 +02:00
parent 71849668e9
commit 178629382c
13 changed files with 472 additions and 165 deletions

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
@ -155,6 +156,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
} }
return this; return this;
} }
/** /**
* Adds an {@link DeleteRequest} to the list of actions to execute. * Adds an {@link DeleteRequest} to the list of actions to execute.
*/ */
@ -272,7 +274,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
String timestamp = null; String timestamp = null;
Long ttl = null; Long ttl = null;
String opType = null; String opType = null;
long version = 0; long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL; VersionType versionType = VersionType.INTERNAL;
String percolate = null; String percolate = null;
int retryOnConflict = 0; int retryOnConflict = 0;
@ -345,6 +347,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
.percolate(percolate), payload); .percolate(percolate), payload);
} else if ("update".equals(action)) { } else if ("update".equals(action)) {
internalAdd(new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict) internalAdd(new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict)
.version(version).versionType(versionType)
.source(data.slice(from, nextMarker - from)) .source(data.slice(from, nextMarker - from))
.percolate(percolate), payload); .percolate(percolate), payload);
} }

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.*; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
@ -130,7 +131,7 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
private OpType opType = OpType.INDEX; private OpType opType = OpType.INDEX;
private boolean refresh = false; private boolean refresh = false;
private long version = 0; private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL; private VersionType versionType = VersionType.INTERNAL;
private String percolate; private String percolate;

View File

@ -14,8 +14,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.DocumentSourceMissingException; import org.elasticsearch.index.engine.DocumentSourceMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetField; import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
@ -78,9 +80,23 @@ public class UpdateHelper extends AbstractComponent {
.refresh(request.refresh()) .refresh(request.refresh())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false); indexRequest.operationThreaded(false);
if (request.versionType() == VersionType.EXTERNAL) {
// in external versioning mode, we want to create the new document using the given version.
indexRequest.version(request.version()).versionType(VersionType.EXTERNAL);
}
return new Result(indexRequest, Operation.UPSERT, null, null); return new Result(indexRequest, Operation.UPSERT, null, null);
} }
if (request.versionType().isVersionConflict(getResult.getVersion(), request.version())) {
throw new VersionConflictEngineException(new ShardId(request.index(), request.shardId()), request.type(), request.id(),
getResult.getVersion(), request.version());
}
long updateVersion = getResult.getVersion();
if (request.versionType() == VersionType.EXTERNAL) {
updateVersion = request.version(); // remember, match_any is excluded by the conflict test
}
if (getResult.internalSourceRef() == null) { if (getResult.internalSourceRef() == null) {
// no source, we can't do nothing, through a failure... // no source, we can't do nothing, through a failure...
throw new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()); throw new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
@ -148,12 +164,11 @@ public class UpdateHelper extends AbstractComponent {
} }
} }
// TODO: external version type, does it make sense here? does not seem like it...
// TODO: because we use getResult.getVersion we loose the doc.version. The question is where is the right place?
if (operation == null || "index".equals(operation)) { if (operation == null || "index".equals(operation)) {
final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(updatedSourceAsMap, updateSourceContentType) .source(updatedSourceAsMap, updateSourceContentType)
.version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()) .version(updateVersion).versionType(request.versionType())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel())
.timestamp(timestamp).ttl(ttl) .timestamp(timestamp).ttl(ttl)
.percolate(request.percolate()) .percolate(request.percolate())
.refresh(request.refresh()); .refresh(request.refresh());
@ -161,7 +176,8 @@ public class UpdateHelper extends AbstractComponent {
return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType); return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType);
} else if ("delete".equals(operation)) { } else if ("delete".equals(operation)) {
DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); .version(updateVersion).versionType(request.versionType())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
deleteRequest.operationThreaded(false); deleteRequest.operationThreaded(false);
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType); return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) { } else if ("none".equals(operation)) {

View File

@ -31,10 +31,12 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
@ -59,7 +61,9 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private String[] fields; private String[] fields;
int retryOnConflict = 0; private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private int retryOnConflict = 0;
private String percolate; private String percolate;
@ -69,7 +73,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private IndexRequest upsertRequest; private IndexRequest upsertRequest;
private boolean docAsUpsert = false; private boolean docAsUpsert = false;
@Nullable @Nullable
@ -94,14 +98,19 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
if (id == null) { if (id == null) {
validationException = addValidationError("id is missing", validationException); validationException = addValidationError("id is missing", validationException);
} }
if (version != Versions.MATCH_ANY && retryOnConflict > 0) {
validationException = addValidationError("can't provide both retry_on_conflict and a specific version", validationException);
}
if (script == null && doc == null) { if (script == null && doc == null) {
validationException = addValidationError("script or doc is missing", validationException); validationException = addValidationError("script or doc is missing", validationException);
} }
if (script != null && doc != null) { if (script != null && doc != null) {
validationException = addValidationError("can't provide both script and doc", validationException); validationException = addValidationError("can't provide both script and doc", validationException);
} }
if(doc == null && docAsUpsert){ if (doc == null && docAsUpsert) {
validationException = addValidationError("can't say to upsert doc without providing doc", validationException); validationException = addValidationError("can't say to upsert doc without providing doc", validationException);
} }
return validationException; return validationException;
} }
@ -285,6 +294,31 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
return this.retryOnConflict; return this.retryOnConflict;
} }
/**
* Sets the version, which will cause the index operation to only be performed if a matching
* version exists and no changes happened on the doc since then.
*/
public UpdateRequest version(long version) {
this.version = version;
return this;
}
public long version() {
return this.version;
}
/**
* Sets the versioning type. Defaults to {@link VersionType#INTERNAL}.
*/
public UpdateRequest versionType(VersionType versionType) {
this.versionType = versionType;
return this;
}
public VersionType versionType() {
return this.versionType;
}
/** /**
* Causes the update request document to be percolated. The parameter is the percolate query * Causes the update request document to be percolated. The parameter is the percolate query
* to use to reduce the percolated queries that are going to run against this doc. Can be * to use to reduce the percolated queries that are going to run against this doc. Can be
@ -396,6 +430,14 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
return this; return this;
} }
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequest doc(String field, Object value) {
safeDoc().source(field, value);
return this;
}
public IndexRequest doc() { public IndexRequest doc() {
return this.doc; return this.doc;
} }
@ -513,8 +555,8 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
XContentBuilder docBuilder = XContentFactory.contentBuilder(xContentType); XContentBuilder docBuilder = XContentFactory.contentBuilder(xContentType);
docBuilder.copyCurrentStructure(parser); docBuilder.copyCurrentStructure(parser);
safeDoc().source(docBuilder); safeDoc().source(docBuilder);
} else if("doc_as_upsert".equals(currentFieldName)){ } else if ("doc_as_upsert".equals(currentFieldName)) {
docAsUpsert(parser.booleanValue()); docAsUpsert(parser.booleanValue());
} }
} }
} finally { } finally {
@ -526,9 +568,10 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
public boolean docAsUpsert() { public boolean docAsUpsert() {
return this.docAsUpsert; return this.docAsUpsert;
} }
public void docAsUpsert(boolean shouldUpsertDoc) { public void docAsUpsert(boolean shouldUpsertDoc) {
this.docAsUpsert = shouldUpsertDoc; this.docAsUpsert = shouldUpsertDoc;
if(this.doc != null && this.upsertRequest == null){ if (this.doc != null && this.upsertRequest == null) {
upsert(doc); upsert(doc);
} }
} }
@ -565,6 +608,8 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
if (in.getVersion().onOrAfter(Version.V_0_90_2)) { if (in.getVersion().onOrAfter(Version.V_0_90_2)) {
docAsUpsert = in.readBoolean(); docAsUpsert = in.readBoolean();
} }
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
} }
@Override @Override
@ -612,6 +657,8 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
if (out.getVersion().onOrAfter(Version.V_0_90_2)) { if (out.getVersion().onOrAfter(Version.V_0_90_2)) {
out.writeBoolean(docAsUpsert); out.writeBoolean(docAsUpsert);
} }
out.writeLong(version);
out.writeByte(versionType.getValue());
} }
} }

View File

@ -29,6 +29,7 @@ import org.elasticsearch.client.internal.InternalClient;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import java.util.Map; import java.util.Map;
@ -124,6 +125,24 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
return this; return this;
} }
/**
* Sets the version, which will cause the index operation to only be performed if a matching
* version exists and no changes happened on the doc since then.
*/
public UpdateRequestBuilder setVersion(long version) {
request.version(version);
return this;
}
/**
* Sets the versioning type. Defaults to {@link org.elasticsearch.index.VersionType#INTERNAL}.
*/
public UpdateRequestBuilder setVersionType(VersionType versionType) {
request.versionType(versionType);
return this;
}
/** /**
* Should a refresh be executed post this update operation causing the operation to * Should a refresh be executed post this update operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults * be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
@ -216,6 +235,14 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
return this; return this;
} }
/**
* Sets the doc to use for updates when a script is not specified.
*/
public UpdateRequestBuilder setDoc(String field, Object value) {
request.doc(field, value);
return this;
}
/** /**
* Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException} * Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException}
* is thrown. * is thrown.
@ -305,5 +332,4 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
protected void doExecute(ActionListener<UpdateResponse> listener) { protected void doExecute(ActionListener<UpdateResponse> listener) {
((Client) client).update(request, listener); ((Client) client).update(request, listener);
} }
} }

View File

@ -19,13 +19,47 @@
package org.elasticsearch.index; package org.elasticsearch.index;
import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.lucene.uid.Versions;
/** /**
* *
*/ */
public enum VersionType { public enum VersionType {
INTERNAL((byte) 0), INTERNAL((byte) 0) {
EXTERNAL((byte) 1); /**
* - always returns false if currentVersion == {@link Versions#NOT_SET}
* - always accepts expectedVersion == {@link Versions#MATCH_ANY}
* - if expectedVersion is set, always conflict if currentVersion == {@link Versions#NOT_FOUND}
*/
@Override
public boolean isVersionConflict(long currentVersion, long expectedVersion) {
return currentVersion != Versions.NOT_SET && expectedVersion != Versions.MATCH_ANY
&& (currentVersion == Versions.NOT_FOUND || currentVersion != expectedVersion);
}
@Override
public long updateVersion(long currentVersion, long expectedVersion) {
return (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
}
},
EXTERNAL((byte) 1) {
/**
* - always returns false if currentVersion == {@link Versions#NOT_SET}
* - always conflict if expectedVersion == {@link Versions#MATCH_ANY} (we need something to set)
* - accepts currentVersion == {@link Versions#NOT_FOUND}
*/
@Override
public boolean isVersionConflict(long currentVersion, long expectedVersion) {
return currentVersion != Versions.NOT_SET && currentVersion != Versions.NOT_FOUND
&& (expectedVersion == Versions.MATCH_ANY || currentVersion >= expectedVersion);
}
@Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
}
};
private final byte value; private final byte value;
@ -37,6 +71,20 @@ public enum VersionType {
return value; return value;
} }
/**
* Checks whether the current version conflicts with the expected version, based on the current version type.
*
* @return true if versions conflict false o.w.
*/
public abstract boolean isVersionConflict(long currentVersion, long expectedVersion);
/**
* Returns the new version for a document, based on it's current one and the specified in the request
*
* @return new version
*/
public abstract long updateVersion(long currentVersion, long expectedVersion);
public static VersionType fromString(String versionType) { public static VersionType fromString(String versionType) {
if ("internal".equals(versionType)) { if ("internal".equals(versionType)) {
return INTERNAL; return INTERNAL;

View File

@ -394,49 +394,23 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// same logic as index // same logic as index
long updatedVersion; long updatedVersion;
long expectedVersion = create.version();
if (create.origin() == Operation.Origin.PRIMARY) { if (create.origin() == Operation.Origin.PRIMARY) {
if (create.versionType() == VersionType.INTERNAL) { // internal version type if (create.versionType().isVersionConflict(currentVersion, expectedVersion)) {
long expectedVersion = create.version(); throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
if (expectedVersion != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) {
// an explicit version is provided, see if there is a conflict
// if we did not find anything, and a version is provided, so we do expect to find a doc under that version
// this is important, since we don't allow to preset a version in order to handle deletes
if (currentVersion == Versions.NOT_FOUND) {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), Versions.NOT_FOUND, expectedVersion);
} else if (expectedVersion != currentVersion) {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
}
}
updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
} else { // external version type
// an external version is provided, just check, if a local version exists, that its higher than it
// the actual version checking is one in an external system, and we just want to not index older versions
if (currentVersion >= 0) { // we can check!, its there
if (currentVersion >= create.version()) {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, create.version());
}
}
updatedVersion = create.version();
} }
updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion);
} else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
long expectedVersion = create.version(); // replicas treat the version as "external" as it comes from the primary ->
if (currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore... // only exploding if the version they got is lower or equal to what they know.
// if it does not exists, and its considered the first index operation (replicas/recovery are 1 of) if (VersionType.EXTERNAL.isVersionConflict(currentVersion, expectedVersion)) {
// then nothing to check if (create.origin() == Operation.Origin.RECOVERY) {
if (!(currentVersion == Versions.NOT_FOUND && create.version() == 1)) { return;
// with replicas/recovery, we only check for previous version, we allow to set a future version } else {
if (expectedVersion <= currentVersion) { throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
if (create.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
}
}
} }
} }
// replicas already hold the "future" version updatedVersion = VersionType.EXTERNAL.updateVersion(currentVersion, expectedVersion);
updatedVersion = create.version();
} }
// if the doc does not exists or it exists but not delete // if the doc does not exists or it exists but not delete
@ -516,49 +490,25 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
long updatedVersion; long updatedVersion;
long expectedVersion = index.version();
if (index.origin() == Operation.Origin.PRIMARY) { if (index.origin() == Operation.Origin.PRIMARY) {
if (index.versionType() == VersionType.INTERNAL) { // internal version type if (index.versionType().isVersionConflict(currentVersion, expectedVersion)) {
long expectedVersion = index.version(); throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
if (expectedVersion != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) { }
// an explicit version is provided, see if there is a conflict
// if we did not find anything, and a version is provided, so we do expect to find a doc under that version updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
// this is important, since we don't allow to preset a version in order to handle deletes
if (currentVersion == Versions.NOT_FOUND) {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), Versions.NOT_FOUND, expectedVersion);
} else if (expectedVersion != currentVersion) {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
}
updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
} else { // external version type
// an external version is provided, just check, if a local version exists, that its higher than it
// the actual version checking is one in an external system, and we just want to not index older versions
if (currentVersion >= 0) { // we can check!, its there
if (currentVersion >= index.version()) {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, index.version());
}
}
updatedVersion = index.version();
}
} else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
long expectedVersion = index.version(); // replicas treat the version as "external" as it comes from the primary ->
if (currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore... // only exploding if the version they got is lower or equal to what they know.
// if it does not exists, and its considered the first index operation (replicas/recovery are 1 of) if (VersionType.EXTERNAL.isVersionConflict(currentVersion, expectedVersion)) {
// then nothing to check if (index.origin() == Operation.Origin.RECOVERY) {
if (!(currentVersion == Versions.NOT_FOUND && index.version() == 1)) { return;
// with replicas/recovery, we only check for previous version, we allow to set a future version } else {
if (expectedVersion <= currentVersion) { throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
if (index.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
}
} }
} }
// replicas already hold the "future" version updatedVersion = VersionType.EXTERNAL.updateVersion(currentVersion, expectedVersion);
updatedVersion = index.version();
} }
index.version(updatedVersion); index.version(updatedVersion);
@ -571,7 +521,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
writer.addDocument(index.docs().get(0), index.analyzer()); writer.addDocument(index.docs().get(0), index.analyzer());
} }
} else { } else {
if (versionValue != null) index.created(versionValue.delete()); // we have a delete which is not GC'ed... if (versionValue != null) {
index.created(versionValue.delete()); // we have a delete which is not GC'ed...
}
if (index.docs().size() > 1) { if (index.docs().size() > 1) {
writer.updateDocuments(index.uid(), index.docs(), index.analyzer()); writer.updateDocuments(index.uid(), index.docs(), index.analyzer());
} else { } else {
@ -629,45 +581,25 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
long updatedVersion; long updatedVersion;
long expectedVersion = delete.version();
if (delete.origin() == Operation.Origin.PRIMARY) { if (delete.origin() == Operation.Origin.PRIMARY) {
if (delete.versionType() == VersionType.INTERNAL) { // internal version type if (delete.versionType().isVersionConflict(currentVersion, expectedVersion)) {
if (delete.version() != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore... throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, expectedVersion);
// an explicit version is provided, see if there is a conflict }
// if we did not find anything and a version is provided, so we do expect to find a doc under that version
if (currentVersion == Versions.NOT_FOUND) { updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), Versions.NOT_FOUND, delete.version());
} else if (delete.version() != currentVersion) {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, delete.version());
}
}
updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
} else { // External
if (currentVersion == Versions.NOT_FOUND) {
// its an external version, that's fine, we allow it to be set
//throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), UidField.DocIdAndVersion.Versions.NOT_FOUND, delete.version());
} else if (currentVersion >= delete.version()) {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, delete.version());
}
updatedVersion = delete.version();
}
} else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
// on replica, the version is the future value expected (returned from the operation on the primary) // replicas treat the version as "external" as it comes from the primary ->
if (currentVersion != Versions.NOT_SET) { // we don't have a version in the index, ignore // only exploding if the version they got is lower or equal to what they know.
// only check if we have a version for it, otherwise, ignore (see later) if (VersionType.EXTERNAL.isVersionConflict(currentVersion, expectedVersion)) {
if (currentVersion != Versions.NOT_FOUND) { if (delete.origin() == Operation.Origin.RECOVERY) {
// with replicas, we only check for previous version, we allow to set a future version return;
if (delete.version() <= currentVersion) { } else {
if (delete.origin() == Operation.Origin.RECOVERY) { throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion - 1, expectedVersion);
return;
} else {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion - 1, delete.version());
}
}
} }
} }
// replicas already hold the "future" version updatedVersion = VersionType.EXTERNAL.updateVersion(currentVersion, expectedVersion);
updatedVersion = delete.version();
} }
if (currentVersion == Versions.NOT_FOUND) { if (currentVersion == Versions.NOT_FOUND) {
@ -1056,7 +988,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
snapshotIndexCommit = deletionPolicy.snapshot(); snapshotIndexCommit = deletionPolicy.snapshot();
traslogSnapshot = translog.snapshot(); traslogSnapshot = translog.snapshot();
} catch (Exception e) { } catch (Exception e) {
if (snapshotIndexCommit != null) snapshotIndexCommit.release(); if (snapshotIndexCommit != null) {
snapshotIndexCommit.release();
}
throw new SnapshotFailedEngineException(shardId, e); throw new SnapshotFailedEngineException(shardId, e);
} finally { } finally {
rwl.readLock().unlock(); rwl.readLock().unlock();

View File

@ -86,6 +86,9 @@ public class RestUpdateAction extends BaseRestHandler {
} }
} }
updateRequest.retryOnConflict(request.paramAsInt("retry_on_conflict", updateRequest.retryOnConflict())); updateRequest.retryOnConflict(request.paramAsInt("retry_on_conflict", updateRequest.retryOnConflict()));
updateRequest.version(RestActions.parseVersion(request));
updateRequest.versionType(VersionType.fromString(request.param("version_type"), updateRequest.versionType()));
// see if we have it in the body // see if we have it in the body
if (request.hasContent()) { if (request.hasContent()) {

View File

@ -20,9 +20,10 @@ package org.elasticsearch.test.hamcrest;
import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
@ -65,14 +66,14 @@ public class ElasticsearchAssertions {
public static void assertSearchHit(SearchResponse searchResponse, int number, Matcher<SearchHit> matcher) { public static void assertSearchHit(SearchResponse searchResponse, int number, Matcher<SearchHit> matcher) {
assert number > 0; assert number > 0;
assertThat("SearchHit number must be greater than 0", number, greaterThan(0)); assertThat("SearchHit number must be greater than 0", number, greaterThan(0));
assertThat(searchResponse.getHits().totalHits(), greaterThanOrEqualTo((long)number)); assertThat(searchResponse.getHits().totalHits(), greaterThanOrEqualTo((long) number));
assertSearchHit(searchResponse.getHits().getAt(number-1), matcher); assertSearchHit(searchResponse.getHits().getAt(number - 1), matcher);
} }
public static void assertNoFailures(SearchResponse searchResponse) { public static void assertNoFailures(SearchResponse searchResponse) {
assertThat("Unexpectd ShardFailures: " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0)); assertThat("Unexpectd ShardFailures: " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0));
} }
public static void assertNoFailures(BroadcastOperationResponse response) { public static void assertNoFailures(BroadcastOperationResponse response) {
assertThat("Unexpectd ShardFailures: " + Arrays.toString(response.getShardFailures()), response.getFailedShards(), equalTo(0)); assertThat("Unexpectd ShardFailures: " + Arrays.toString(response.getShardFailures()), response.getFailedShards(), equalTo(0));
} }
@ -88,16 +89,16 @@ public class ElasticsearchAssertions {
assertThat(resp.getHits().hits()[hit].getHighlightFields().get(field).fragments().length, greaterThan(fragment)); assertThat(resp.getHits().hits()[hit].getHighlightFields().get(field).fragments().length, greaterThan(fragment));
assertThat(resp.getHits().hits()[hit].highlightFields().get(field).fragments()[fragment].string(), matcher); assertThat(resp.getHits().hits()[hit].highlightFields().get(field).fragments()[fragment].string(), matcher);
} }
public static void assertSuggestionSize(Suggest searchSuggest, int entry, int size, String key) { public static void assertSuggestionSize(Suggest searchSuggest, int entry, int size, String key) {
assertThat(searchSuggest, notNullValue()); assertThat(searchSuggest, notNullValue());
assertThat(searchSuggest.size(),greaterThanOrEqualTo(1)); assertThat(searchSuggest.size(), greaterThanOrEqualTo(1));
assertThat(searchSuggest.getSuggestion(key).getName(), equalTo(key)); assertThat(searchSuggest.getSuggestion(key).getName(), equalTo(key));
assertThat(searchSuggest.getSuggestion(key).getEntries().size(), greaterThanOrEqualTo(entry)); assertThat(searchSuggest.getSuggestion(key).getEntries().size(), greaterThanOrEqualTo(entry));
assertThat(searchSuggest.getSuggestion(key).getEntries().get(entry).getOptions().size(), equalTo(size)); assertThat(searchSuggest.getSuggestion(key).getEntries().get(entry).getOptions().size(), equalTo(size));
} }
public static void assertSuggestion(Suggest searchSuggest, int entry, int ord, String key, String text) { public static void assertSuggestion(Suggest searchSuggest, int entry, int ord, String key, String text) {
assertThat(searchSuggest, notNullValue()); assertThat(searchSuggest, notNullValue());
assertThat(searchSuggest.size(), greaterThanOrEqualTo(1)); assertThat(searchSuggest.size(), greaterThanOrEqualTo(1));
@ -121,31 +122,34 @@ public class ElasticsearchAssertions {
public static Matcher<SearchHit> hasIndex(final String index) { public static Matcher<SearchHit> hasIndex(final String index) {
return new ElasticsearchMatchers.SearchHitHasIndexMatcher(index); return new ElasticsearchMatchers.SearchHitHasIndexMatcher(index);
} }
public static <T extends Query> T assertBooleanSubQuery(Query query, Class<T> subqueryType, int i) { public static <T extends Query> T assertBooleanSubQuery(Query query, Class<T> subqueryType, int i) {
assertThat(query, instanceOf(BooleanQuery.class)); assertThat(query, instanceOf(BooleanQuery.class));
BooleanQuery q = (BooleanQuery) query; BooleanQuery q = (BooleanQuery) query;
assertThat(q.getClauses().length, greaterThan(i)); assertThat(q.getClauses().length, greaterThan(i));
assertThat(q.getClauses()[i].getQuery(), instanceOf(subqueryType)); assertThat(q.getClauses()[i].getQuery(), instanceOf(subqueryType));
return (T)q.getClauses()[i].getQuery(); return (T) q.getClauses()[i].getQuery();
} }
public static <E extends Throwable> void assertThrows(ActionFuture future, Class<E> exceptionClass) { public static <E extends Throwable> void assertThrows(ActionRequestBuilder<?, ?, ?> builder, Class<E> exceptionClass) {
boolean fail=false; assertThrows(builder.execute(), exceptionClass);
}
public static <E extends Throwable> void assertThrows(ActionFuture future, Class<E> exceptionClass) {
boolean fail = false;
try { try {
future.actionGet(); future.actionGet();
fail=true; fail = true;
} } catch (ElasticSearchException esException) {
catch (ElasticSearchException esException) {
assertThat(esException.unwrapCause(), instanceOf(exceptionClass)); assertThat(esException.unwrapCause(), instanceOf(exceptionClass));
} } catch (Throwable e) {
catch (Throwable e) {
assertThat(e, instanceOf(exceptionClass)); assertThat(e, instanceOf(exceptionClass));
} }
// has to be outside catch clause to get a proper message // has to be outside catch clause to get a proper message
if (fail) if (fail) {
throw new AssertionError("Expected a " + exceptionClass + " exception to be thrown"); throw new AssertionError("Expected a " + exceptionClass + " exception to be thrown");
}
} }
} }

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRespon
import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse; import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
@ -46,7 +47,6 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider;
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndexTemplateMissingException; import org.elasticsearch.indices.IndexTemplateMissingException;
@ -58,7 +58,6 @@ import org.testng.annotations.BeforeMethod;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Random;
import java.util.Set; import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -319,6 +318,10 @@ public abstract class AbstractSharedClusterTest extends ElasticsearchTestCase {
return client().prepareIndex(index, type).setSource(source).execute().actionGet(); return client().prepareIndex(index, type).setSource(source).execute().actionGet();
} }
protected GetResponse get(String index, String type, String id) {
return client().prepareGet(index, type, id).execute().actionGet();
}
protected IndexResponse index(String index, String type, String id, String field, Object value) { protected IndexResponse index(String index, String type, String id, String field, Object value) {
return client().prepareIndex(index, type, id).setSource(field, value).execute().actionGet(); return client().prepareIndex(index, type, id).setSource(field, value).execute().actionGet();
} }

View File

@ -1,21 +1,23 @@
package org.elasticsearch.test.integration.document; package org.elasticsearch.test.integration.document;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
/** /**
*/ */
public class BulkTests extends AbstractSharedClusterTest { public class BulkTests extends AbstractSharedClusterTest {
@ -105,6 +107,53 @@ public class BulkTests extends AbstractSharedClusterTest {
assertThat(((Long) getResponse.getField("field").getValue()), equalTo(4l)); assertThat(((Long) getResponse.getField("field").getValue()), equalTo(4l));
} }
@Test
public void testBulkVersioning() throws Exception {
createIndex("test");
ensureGreen();
BulkResponse bulkResponse = run(client().prepareBulk()
.add(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field", "1"))
.add(client().prepareIndex("test", "type", "2").setCreate(true).setSource("field", "1"))
.add(client().prepareIndex("test", "type", "1").setSource("field", "2")));
assertTrue(((IndexResponse) bulkResponse.getItems()[0].getResponse()).isCreated());
assertThat(((IndexResponse) bulkResponse.getItems()[0].getResponse()).getVersion(), equalTo(1l));
assertTrue(((IndexResponse) bulkResponse.getItems()[1].getResponse()).isCreated());
assertThat(((IndexResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(1l));
assertFalse(((IndexResponse) bulkResponse.getItems()[2].getResponse()).isCreated());
assertThat(((IndexResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(2l));
bulkResponse = run(client().prepareBulk()
.add(client().prepareUpdate("test", "type", "1").setVersion(4l).setDoc("field", "2"))
.add(client().prepareUpdate("test", "type", "2").setDoc("field", "2"))
.add(client().prepareUpdate("test", "type", "1").setVersion(2l).setDoc("field", "3")));
assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("Version"));
assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(2l));
assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(3l));
bulkResponse = run(client().prepareBulk()
.add(client().prepareIndex("test", "type", "e1").setCreate(true).setSource("field", "1").setVersion(10).setVersionType(VersionType.EXTERNAL))
.add(client().prepareIndex("test", "type", "e2").setCreate(true).setSource("field", "1").setVersion(10).setVersionType(VersionType.EXTERNAL))
.add(client().prepareIndex("test", "type", "e1").setSource("field", "2").setVersion(12).setVersionType(VersionType.EXTERNAL)));
assertTrue(((IndexResponse) bulkResponse.getItems()[0].getResponse()).isCreated());
assertThat(((IndexResponse) bulkResponse.getItems()[0].getResponse()).getVersion(), equalTo(10l));
assertTrue(((IndexResponse) bulkResponse.getItems()[1].getResponse()).isCreated());
assertThat(((IndexResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(10l));
assertFalse(((IndexResponse) bulkResponse.getItems()[2].getResponse()).isCreated());
assertThat(((IndexResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(12l));
bulkResponse = run(client().prepareBulk()
.add(client().prepareUpdate("test", "type", "e1").setVersion(4l).setDoc("field", "2").setVersion(10).setVersionType(VersionType.EXTERNAL))
.add(client().prepareUpdate("test", "type", "e2").setDoc("field", "2").setVersion(15).setVersionType(VersionType.EXTERNAL))
.add(client().prepareUpdate("test", "type", "e1").setVersion(2l).setDoc("field", "3").setVersion(15).setVersionType(VersionType.EXTERNAL)));
assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("Version"));
assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(15l));
assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(15l));
}
@Test @Test
public void testBulkUpdate_malformedScripts() throws Exception { public void testBulkUpdate_malformedScripts() throws Exception {
client().admin().indices().prepareDelete().execute().actionGet(); client().admin().indices().prepareDelete().execute().actionGet();
@ -140,7 +189,7 @@ public class BulkTests extends AbstractSharedClusterTest {
assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getId(), equalTo("2")); assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getId(), equalTo("2"));
assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(2l)); assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(2l));
assertThat(((Integer)((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getGetResult().field("field").getValue()), equalTo(2)); assertThat(((Integer) ((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getGetResult().field("field").getValue()), equalTo(2));
assertThat(bulkResponse.getItems()[1].getFailure(), nullValue()); assertThat(bulkResponse.getItems()[1].getFailure(), nullValue());
assertThat(bulkResponse.getItems()[2].getFailure().getId(), equalTo("3")); assertThat(bulkResponse.getItems()[2].getFailure().getId(), equalTo("3"));
@ -182,7 +231,7 @@ public class BulkTests extends AbstractSharedClusterTest {
assertThat(response.getItems()[i].getOpType(), equalTo("update")); assertThat(response.getItems()[i].getOpType(), equalTo("update"));
assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getId(), equalTo(Integer.toString(i))); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getId(), equalTo(Integer.toString(i)));
assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getVersion(), equalTo(1l)); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getVersion(), equalTo(1l));
assertThat(((Integer)((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(1)); assertThat(((Integer) ((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(1));
for (int j = 0; j < 5; j++) { for (int j = 0; j < 5; j++) {
GetResponse getResponse = client().prepareGet("test", "type1", Integer.toString(i)).setFields("counter").execute().actionGet(); GetResponse getResponse = client().prepareGet("test", "type1", Integer.toString(i)).setFields("counter").execute().actionGet();
@ -219,7 +268,7 @@ public class BulkTests extends AbstractSharedClusterTest {
assertThat(response.getItems()[i].getOpType(), equalTo("update")); assertThat(response.getItems()[i].getOpType(), equalTo("update"));
assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getId(), equalTo(Integer.toString(i))); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getId(), equalTo(Integer.toString(i)));
assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getVersion(), equalTo(2l)); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getVersion(), equalTo(2l));
assertThat(((Integer)((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(2)); assertThat(((Integer) ((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(2));
} }
builder = client().prepareBulk(); builder = client().prepareBulk();

View File

@ -29,7 +29,9 @@ import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.testng.annotations.Test; import org.testng.annotations.Test;
@ -39,6 +41,7 @@ import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import static org.testng.AssertJUnit.*; import static org.testng.AssertJUnit.*;
@ -163,22 +166,23 @@ public class UpdateTests extends AbstractSharedClusterTest {
assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2")); assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2"));
} }
} }
@Test @Test
public void testUpsertDoc() throws Exception { public void testUpsertDoc() throws Exception {
createIndex(); createIndex();
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1") UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1")
.setDoc(XContentFactory.jsonBuilder().startObject().field("bar", "baz").endObject()) .setDoc(XContentFactory.jsonBuilder().startObject().field("bar", "baz").endObject())
.setDocAsUpsert(true) .setDocAsUpsert(true)
.setFields("_source") .setFields("_source")
.execute().actionGet(); .execute().actionGet();
assertThat(updateResponse.getGetResult(), notNullValue()); assertThat(updateResponse.getGetResult(), notNullValue());
assertThat(updateResponse.getGetResult().sourceAsMap().get("bar").toString(), equalTo("baz")); assertThat(updateResponse.getGetResult().sourceAsMap().get("bar").toString(), equalTo("baz"));
} }
@Test @Test
public void testUpsertFields() throws Exception { public void testUpsertFields() throws Exception {
createIndex(); createIndex();
@ -207,6 +211,60 @@ public class UpdateTests extends AbstractSharedClusterTest {
assertThat(updateResponse.getGetResult().sourceAsMap().get("extra").toString(), equalTo("foo")); assertThat(updateResponse.getGetResult().sourceAsMap().get("extra").toString(), equalTo("foo"));
} }
@Test
public void testVersionedUpdate() throws Exception {
createIndex("test");
ensureGreen();
index("test", "type", "1", "text", "value"); // version is now 1
assertThrows(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v2'").setVersion(2).execute(),
VersionConflictEngineException.class);
run(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v2'").setVersion(1));
assertThat(run(client().prepareGet("test", "type", "1")).getVersion(), equalTo(2l));
// and again with a higher version..
run(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v3'").setVersion(2));
assertThat(run(client().prepareGet("test", "type", "1")).getVersion(), equalTo(3l));
// after delete
run(client().prepareDelete("test", "type", "1"));
assertThrows(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v2'").setVersion(3).execute(),
DocumentMissingException.class);
// external versioning
run(client().prepareIndex("test", "type", "2").setSource("text", "value").setVersion(10).setVersionType(VersionType.EXTERNAL));
assertThrows(client().prepareUpdate("test", "type", "2").setScript("ctx._source.text = 'v2'").setVersion(2).setVersionType(VersionType.EXTERNAL).execute(),
VersionConflictEngineException.class);
run(client().prepareUpdate("test", "type", "2").setScript("ctx._source.text = 'v2'").setVersion(11).setVersionType(VersionType.EXTERNAL));
assertThat(run(client().prepareGet("test", "type", "2")).getVersion(), equalTo(11l));
// upserts - the combination with versions is a bit weird. Test are here to ensure we do not change our behavior unintentionally
// With internal versions, tt means "if object is there with version X, update it or explode. If it is not there, index.
run(client().prepareUpdate("test", "type", "3").setScript("ctx._source.text = 'v2'").setVersion(10).setUpsertRequest("{ \"text\": \"v0\" }"));
GetResponse get = get("test", "type", "3");
assertThat(get.getVersion(), equalTo(1l));
assertThat((String) get.getSource().get("text"), equalTo("v0"));
// With external versions, it means - if object is there with version lower than X, update it or explode. If it is not there, insert with new version.
run(client().prepareUpdate("test", "type", "4").setScript("ctx._source.text = 'v2'").
setVersion(10).setVersionType(VersionType.EXTERNAL).setUpsertRequest("{ \"text\": \"v0\" }"));
get = get("test", "type", "4");
assertThat(get.getVersion(), equalTo(10l));
assertThat((String) get.getSource().get("text"), equalTo("v0"));
// retry on conflict is rejected:
assertThrows(client().prepareUpdate("test", "type", "1").setVersion(10).setRetryOnConflict(5), ActionRequestValidationException.class);
}
@Test @Test
public void testIndexAutoCreation() throws Exception { public void testIndexAutoCreation() throws Exception {
try { try {
@ -390,10 +448,10 @@ public class UpdateTests extends AbstractSharedClusterTest {
assertThat(e.getMessage(), containsString("can't provide both script and doc")); assertThat(e.getMessage(), containsString("can't provide both script and doc"));
} }
} }
@Test @Test
public void testUpdateRequestWithScriptAndShouldUpsertDoc() throws Exception{ public void testUpdateRequestWithScriptAndShouldUpsertDoc() throws Exception {
createIndex(); createIndex();
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));

View File

@ -0,0 +1,115 @@
package org.elasticsearch.test.unit.index;
/*
* Licensed to ElasticSearch under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;
import org.testng.annotations.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
public class VersionTypeTests {
@Test
public void testInternalVersionConflict() throws Exception {
assertFalse(VersionType.INTERNAL.isVersionConflict(10, Versions.MATCH_ANY));
// if we don't have a version in the index we accept everything
assertFalse(VersionType.INTERNAL.isVersionConflict(Versions.NOT_SET, 10));
assertFalse(VersionType.INTERNAL.isVersionConflict(Versions.NOT_SET, Versions.MATCH_ANY));
// if we didn't find a version (but the index does support it), we don't like it unless MATCH_ANY
assertTrue(VersionType.INTERNAL.isVersionConflict(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertTrue(VersionType.INTERNAL.isVersionConflict(Versions.NOT_FOUND, 10));
assertFalse(VersionType.INTERNAL.isVersionConflict(Versions.NOT_FOUND, Versions.MATCH_ANY));
// and the stupid usual case
assertFalse(VersionType.INTERNAL.isVersionConflict(10, 10));
assertTrue(VersionType.INTERNAL.isVersionConflict(9, 10));
assertTrue(VersionType.INTERNAL.isVersionConflict(10, 9));
// Old indexing code, dictating behavior
// if (expectedVersion != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) {
// // an explicit version is provided, see if there is a conflict
// // if we did not find anything, and a version is provided, so we do expect to find a doc under that version
// // this is important, since we don't allow to preset a version in order to handle deletes
// if (currentVersion == Versions.NOT_FOUND) {
// throw new VersionConflictEngineException(shardId, index.type(), index.id(), Versions.NOT_FOUND, expectedVersion);
// } else if (expectedVersion != currentVersion) {
// throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
// }
// }
// updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
}
@Test
public void testExternalVersionConflict() throws Exception {
assertFalse(VersionType.EXTERNAL.isVersionConflict(Versions.NOT_FOUND, 10));
assertFalse(VersionType.EXTERNAL.isVersionConflict(Versions.NOT_SET, 10));
// MATCH_ANY must throw an exception in the case of external version, as the version must be set! it used as the new value
assertTrue(VersionType.EXTERNAL.isVersionConflict(10, Versions.MATCH_ANY));
// if we didn't find a version (but the index does support it), we always accept
assertFalse(VersionType.EXTERNAL.isVersionConflict(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertFalse(VersionType.EXTERNAL.isVersionConflict(Versions.NOT_FOUND, 10));
assertFalse(VersionType.EXTERNAL.isVersionConflict(Versions.NOT_FOUND, Versions.MATCH_ANY));
// and the standard behavior
assertTrue(VersionType.EXTERNAL.isVersionConflict(10, 10));
assertFalse(VersionType.EXTERNAL.isVersionConflict(9, 10));
assertTrue(VersionType.EXTERNAL.isVersionConflict(10, 9));
// Old indexing code, dictating behavior
// // an external version is provided, just check, if a local version exists, that its higher than it
// // the actual version checking is one in an external system, and we just want to not index older versions
// if (currentVersion >= 0) { // we can check!, its there
// if (currentVersion >= index.version()) {
// throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, index.version());
// }
// }
// updatedVersion = index.version();
}
@Test
public void testUpdateVersion() {
assertThat(VersionType.INTERNAL.updateVersion(Versions.NOT_SET, 10), equalTo(1l));
assertThat(VersionType.INTERNAL.updateVersion(Versions.NOT_FOUND, 10), equalTo(1l));
assertThat(VersionType.INTERNAL.updateVersion(1, 1), equalTo(2l));
assertThat(VersionType.INTERNAL.updateVersion(2, Versions.MATCH_ANY), equalTo(3l));
assertThat(VersionType.EXTERNAL.updateVersion(Versions.NOT_SET, 10), equalTo(10l));
assertThat(VersionType.EXTERNAL.updateVersion(Versions.NOT_FOUND, 10), equalTo(10l));
assertThat(VersionType.EXTERNAL.updateVersion(1, 10), equalTo(10l));
// Old indexing code
// if (index.versionType() == VersionType.INTERNAL) { // internal version type
// updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
// } else { // external version type
// updatedVersion = expectedVersion;
// }
}
}