diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 44e3e367728..201cd90fd40 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -232,7 +232,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation BytesReference indexSourceAsBytes = indexRequest.source(); // add the response IndexResponse indexResponse = result.response(); - UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion()); + UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(), + indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated()); updateResponse.setMatches(indexResponse.getMatches()); if (updateRequest.fields() != null && updateRequest.fields().length > 0) { Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); @@ -258,7 +259,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation case DELETE: DeleteResponse response = updateResult.writeResult.response(); DeleteRequest deleteRequest = updateResult.request(); - updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); + updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null)); responses[i] = new BulkItemResponse(item.id(), "update", updateResponse); // Replace the update request to the translated delete request to execute on the replica. @@ -376,17 +377,20 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); long version; + boolean created; Engine.IndexingOperation op; if (indexRequest.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); indexShard.index(index); version = index.version(); op = index; + created = index.created(); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); indexShard.create(create); version = create.version(); op = create; + created = true; } long preVersion = indexRequest.version(); // update the version on request so it will happen on the replicas @@ -403,7 +407,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation op = null; } - IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version); + IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version, created); return new WriteResult(indexResponse, preVersion, mappingsToUpdate, op); } diff --git a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index c6483ce3e91..c8789c5e33e 100644 --- a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.service.IndexShard; @@ -110,12 +111,12 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct @Override public void onResponse(IndexDeleteResponse indexDeleteResponse) { // go over the response, see if we have found one, and the version if found - long version = 0; + long version = Versions.MATCH_ANY; boolean found = false; for (ShardDeleteResponse deleteResponse : indexDeleteResponse.getResponses()) { if (!deleteResponse.isNotFound()) { - found = true; version = deleteResponse.getVersion(); + found = true; break; } } diff --git a/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/src/main/java/org/elasticsearch/action/index/IndexResponse.java index f897ea9aecc..6b2a39fd129 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -40,17 +40,19 @@ public class IndexResponse extends ActionResponse { private String id; private String type; private long version; + private boolean created; private List matches; public IndexResponse() { } - public IndexResponse(String index, String type, String id, long version) { + public IndexResponse(String index, String type, String id, long version, boolean created) { this.index = index; this.id = id; this.type = type; this.version = version; + this.created = created; } /** @@ -75,12 +77,19 @@ public class IndexResponse extends ActionResponse { } /** - * Returns the version of the doc indexed. + * Returns the current version of the doc indexed. */ public long getVersion() { return this.version; } + /** + * Returns true if the document was created, false if updated. + */ + public boolean isCreated() { + return this.created; + } + /** * Returns the percolate queries matches. null if no percolation was requested. */ @@ -102,6 +111,7 @@ public class IndexResponse extends ActionResponse { id = in.readString(); type = in.readString(); version = in.readLong(); + created = in.readBoolean(); if (in.readBoolean()) { int size = in.readVInt(); if (size == 0) { @@ -132,6 +142,7 @@ public class IndexResponse extends ActionResponse { out.writeString(id); out.writeString(type); out.writeLong(version); + out.writeBoolean(created); if (matches == null) { out.writeBoolean(false); } else { diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 1c39c68e9eb..2eeab85b8e9 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -198,6 +198,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); long version; + boolean created; Engine.IndexingOperation op; if (request.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse) @@ -207,6 +208,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi indexShard.index(index); version = index.version(); op = index; + created = index.created(); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse) .version(request.version()) @@ -215,6 +217,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi indexShard.create(create); version = create.version(); op = create; + created = true; } if (request.refresh()) { try { @@ -229,7 +232,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi // update the version on the request, so it will be used for the replicas request.version(version); - IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version); + IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version, created); return new PrimaryResponse(shardRequest.request, response, op); } diff --git a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index b1f8dd245a3..53cdcf13287 100644 --- a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -193,7 +193,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio indexAction.execute(upsertRequest, new ActionListener() { @Override public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); + UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), + response.getVersion(), response.isCreated()); update.setMatches(response.getMatches()); if (request.fields() != null && request.fields().length > 0) { Tuple> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true); @@ -229,7 +230,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio indexAction.execute(indexRequest, new ActionListener() { @Override public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); + UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), + response.getVersion(), response.isCreated()); update.setMatches(response.getMatches()); update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); listener.onResponse(update); @@ -258,7 +260,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio deleteAction.execute(deleteRequest, new ActionListener() { @Override public void onResponse(DeleteResponse response) { - UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); + UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); listener.onResponse(update); } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 5c6936e8cb4..c22c8497715 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -149,6 +149,7 @@ public class UpdateHelper extends AbstractComponent { } // TODO: external version type, does it make sense here? does not seem like it... + // TODO: because we use getResult.getVersion we loose the doc.version. The question is where is the right place? if (operation == null || "index".equals(operation)) { final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) .source(updatedSourceAsMap, updateSourceContentType) @@ -164,12 +165,12 @@ public class UpdateHelper extends AbstractComponent { deleteRequest.operationThreaded(false); return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType); } else if ("none".equals(operation)) { - UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion()); + UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false); update.setGetResult(extractGetResult(request, getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, null)); return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType); } else { logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script); - UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion()); + UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false); return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType); } } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateResponse.java b/src/main/java/org/elasticsearch/action/update/UpdateResponse.java index 8b5dc99723b..6c8ffff54f0 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateResponse.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateResponse.java @@ -37,6 +37,7 @@ public class UpdateResponse extends ActionResponse { private String id; private String type; private long version; + private boolean created; private List matches; private GetResult getResult; @@ -44,11 +45,12 @@ public class UpdateResponse extends ActionResponse { } - public UpdateResponse(String index, String type, String id, long version) { + public UpdateResponse(String index, String type, String id, long version, boolean created) { this.index = index; this.id = id; this.type = type; this.version = version; + this.created = created; } /** @@ -73,7 +75,7 @@ public class UpdateResponse extends ActionResponse { } /** - * Returns the version of the doc indexed. + * Returns the current version of the doc indexed. */ public long getVersion() { return this.version; @@ -94,6 +96,14 @@ public class UpdateResponse extends ActionResponse { return this.getResult; } + + /** + * Returns true if document was created due to an UPSERT operation + */ + public boolean isCreated() { + return this.created; + + } /** * Internal. */ @@ -108,6 +118,7 @@ public class UpdateResponse extends ActionResponse { id = in.readString(); type = in.readString(); version = in.readLong(); + created = in.readBoolean(); if (in.readBoolean()) { int size = in.readVInt(); if (size == 0) { @@ -141,6 +152,7 @@ public class UpdateResponse extends ActionResponse { out.writeString(id); out.writeString(type); out.writeLong(version); + out.writeBoolean(created); if (matches == null) { out.writeBoolean(false); } else { diff --git a/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java b/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java index 844e18f829a..dbdf25f6678 100644 --- a/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java +++ b/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java @@ -32,6 +32,7 @@ import java.util.List; /** Utility class to resolve the Lucene doc ID and version for a given uid. */ public class Versions { + public static final long MATCH_ANY = 0L; // Version was not specified by the user public static final long NOT_FOUND = -1L; public static final long NOT_SET = -2L; diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 28d5a445c5e..549186c09ee 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -379,7 +379,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent { private final DocumentMapper docMapper; private final Term uid; private final ParsedDocument doc; - private long version; + private long version = Versions.MATCH_ANY; private VersionType versionType = VersionType.INTERNAL; private Origin origin = Origin.PRIMARY; @@ -506,9 +506,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent { private final DocumentMapper docMapper; private final Term uid; private final ParsedDocument doc; - private long version; + private long version = Versions.MATCH_ANY; private VersionType versionType = VersionType.INTERNAL; private Origin origin = Origin.PRIMARY; + private boolean created; private long startTime; private long endTime; @@ -554,6 +555,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this; } + /** + * before indexing holds the version requested, after indexing holds the new version of the document. + */ public long version() { return this.version; } @@ -627,13 +631,24 @@ public interface Engine extends IndexShardComponent, CloseableComponent { public long endTime() { return this.endTime; } + + /** + * @return true if object was created + */ + public boolean created() { + return created; + } + + public void created(boolean created) { + this.created = created; + } } static class Delete implements Operation { private final String type; private final String id; private final Term uid; - private long version; + private long version = Versions.MATCH_ANY; private VersionType versionType = VersionType.INTERNAL; private Origin origin = Origin.PRIMARY; private boolean notFound; @@ -679,6 +694,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this; } + /** + * before delete execution this is the version to be deleted. After this is the version of the "delete" transaction record. + */ public long version() { return this.version; } @@ -701,7 +719,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this; } - public Delete startTime(long startTime) { this.startTime = startTime; return this; @@ -836,7 +853,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent { private final Versions.DocIdAndVersion docIdAndVersion; private final Searcher searcher; - public static final GetResult NOT_EXISTS = new GetResult(false, -1, null); + public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null); public GetResult(boolean exists, long version, @Nullable Translog.Source source) { this.source = source; diff --git a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 0ba937b7bfa..39050402730 100644 --- a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -386,7 +386,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { currentVersion = loadCurrentVersionFromIndex(create.uid()); } else { if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { - currentVersion = -1; // deleted, and GC + currentVersion = Versions.NOT_FOUND; // deleted, and GC } else { currentVersion = versionValue.version(); } @@ -397,18 +397,18 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { if (create.origin() == Operation.Origin.PRIMARY) { if (create.versionType() == VersionType.INTERNAL) { // internal version type long expectedVersion = create.version(); - if (expectedVersion != 0 && currentVersion != -2) { // -2 means we don't have a version, so ignore... + if (expectedVersion != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) { // an explicit version is provided, see if there is a conflict - // if the current version is -1, means we did not find anything, and - // a version is provided, so we do expect to find a doc under that version + // if we did not find anything, and a version is provided, so we do expect to find a doc under that version // this is important, since we don't allow to preset a version in order to handle deletes - if (currentVersion == -1) { - throw new VersionConflictEngineException(shardId, create.type(), create.id(), -1, expectedVersion); + if (currentVersion == Versions.NOT_FOUND) { + throw new VersionConflictEngineException(shardId, create.type(), create.id(), Versions.NOT_FOUND, expectedVersion); } else if (expectedVersion != currentVersion) { throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion); } + } - updatedVersion = currentVersion < 0 ? 1 : currentVersion + 1; + updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1; } else { // external version type // an external version is provided, just check, if a local version exists, that its higher than it // the actual version checking is one in an external system, and we just want to not index older versions @@ -421,10 +421,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { long expectedVersion = create.version(); - if (currentVersion != -2) { // -2 means we don't have a version, so ignore... + if (currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore... // if it does not exists, and its considered the first index operation (replicas/recovery are 1 of) // then nothing to check - if (!(currentVersion == -1 && create.version() == 1)) { + if (!(currentVersion == Versions.NOT_FOUND && create.version() == 1)) { // with replicas/recovery, we only check for previous version, we allow to set a future version if (expectedVersion <= currentVersion) { if (create.origin() == Operation.Origin.RECOVERY) { @@ -448,7 +448,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { throw new DocumentAlreadyExistsException(shardId, create.type(), create.id()); } } - } else if (currentVersion != -1) { + } else if (currentVersion != Versions.NOT_FOUND) { // its not deleted, its already there if (create.origin() == Operation.Origin.RECOVERY) { return; @@ -509,7 +509,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { currentVersion = loadCurrentVersionFromIndex(index.uid()); } else { if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { - currentVersion = -1; // deleted, and GC + currentVersion = Versions.NOT_FOUND; // deleted, and GC } else { currentVersion = versionValue.version(); } @@ -519,18 +519,18 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { if (index.origin() == Operation.Origin.PRIMARY) { if (index.versionType() == VersionType.INTERNAL) { // internal version type long expectedVersion = index.version(); - if (expectedVersion != 0 && currentVersion != -2) { // -2 means we don't have a version, so ignore... + if (expectedVersion != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) { // an explicit version is provided, see if there is a conflict - // if the current version is -1, means we did not find anything, and - // a version is provided, so we do expect to find a doc under that version + // if we did not find anything, and a version is provided, so we do expect to find a doc under that version // this is important, since we don't allow to preset a version in order to handle deletes - if (currentVersion == -1) { - throw new VersionConflictEngineException(shardId, index.type(), index.id(), -1, expectedVersion); + if (currentVersion == Versions.NOT_FOUND) { + throw new VersionConflictEngineException(shardId, index.type(), index.id(), Versions.NOT_FOUND, expectedVersion); } else if (expectedVersion != currentVersion) { throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion); } } - updatedVersion = currentVersion < 0 ? 1 : currentVersion + 1; + updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1; + } else { // external version type // an external version is provided, just check, if a local version exists, that its higher than it // the actual version checking is one in an external system, and we just want to not index older versions @@ -543,10 +543,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { long expectedVersion = index.version(); - if (currentVersion != -2) { // -2 means we don't have a version, so ignore... + if (currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore... // if it does not exists, and its considered the first index operation (replicas/recovery are 1 of) // then nothing to check - if (!(currentVersion == -1 && index.version() == 1)) { + if (!(currentVersion == Versions.NOT_FOUND && index.version() == 1)) { // with replicas/recovery, we only check for previous version, we allow to set a future version if (expectedVersion <= currentVersion) { if (index.origin() == Operation.Origin.RECOVERY) { @@ -562,15 +562,16 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } index.version(updatedVersion); - - if (currentVersion == -1) { + if (currentVersion == Versions.NOT_FOUND) { // document does not exists, we can optimize for create + index.created(true); if (index.docs().size() > 1) { writer.addDocuments(index.docs(), index.analyzer()); } else { writer.addDocument(index.docs().get(0), index.analyzer()); } } else { + if (versionValue != null) index.created(versionValue.delete()); // we have a delete which is not GC'ed... if (index.docs().size() > 1) { writer.updateDocuments(index.uid(), index.docs(), index.analyzer()); } else { @@ -621,7 +622,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { currentVersion = loadCurrentVersionFromIndex(delete.uid()); } else { if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { - currentVersion = -1; // deleted, and GC + currentVersion = Versions.NOT_FOUND; // deleted, and GC } else { currentVersion = versionValue.version(); } @@ -630,21 +631,21 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { long updatedVersion; if (delete.origin() == Operation.Origin.PRIMARY) { if (delete.versionType() == VersionType.INTERNAL) { // internal version type - if (delete.version() != 0 && currentVersion != -2) { // -2 means we don't have a version, so ignore... + if (delete.version() != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore... // an explicit version is provided, see if there is a conflict - // if the current version is -1, means we did not find anything, and - // a version is provided, so we do expect to find a doc under that version - if (currentVersion == -1) { - throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), -1, delete.version()); + // if we did not find anything and a version is provided, so we do expect to find a doc under that version + if (currentVersion == Versions.NOT_FOUND) { + throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), Versions.NOT_FOUND, delete.version()); } else if (delete.version() != currentVersion) { throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, delete.version()); } } - updatedVersion = currentVersion < 0 ? 1 : currentVersion + 1; + updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1; + } else { // External - if (currentVersion == -1) { + if (currentVersion == Versions.NOT_FOUND) { // its an external version, that's fine, we allow it to be set - //throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), -1, delete.version()); + //throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), UidField.DocIdAndVersion.Versions.NOT_FOUND, delete.version()); } else if (currentVersion >= delete.version()) { throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, delete.version()); } @@ -652,9 +653,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { // on replica, the version is the future value expected (returned from the operation on the primary) - if (currentVersion != -2) { // -2 means we don't have a version in the index, ignore + if (currentVersion != Versions.NOT_SET) { // we don't have a version in the index, ignore // only check if we have a version for it, otherwise, ignore (see later) - if (currentVersion != -1) { + if (currentVersion != Versions.NOT_FOUND) { // with replicas, we only check for previous version, we allow to set a future version if (delete.version() <= currentVersion) { if (delete.origin() == Operation.Origin.RECOVERY) { @@ -669,7 +670,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { updatedVersion = delete.version(); } - if (currentVersion == -1) { + if (currentVersion == Versions.NOT_FOUND) { // doc does not exists and no prior deletes delete.version(updatedVersion).notFound(true); Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); @@ -1357,7 +1358,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { class ApplySettings implements IndexSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { - long gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(RobinEngine.this.gcDeletesInMillis)).millis(); + long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(RobinEngine.this.gcDeletesInMillis)).millis(); if (gcDeletesInMillis != RobinEngine.this.gcDeletesInMillis) { logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(RobinEngine.this.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis)); RobinEngine.this.gcDeletesInMillis = gcDeletesInMillis; diff --git a/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index c89e8bb552f..b6f827013bc 100644 --- a/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -124,7 +124,7 @@ public class RestIndexAction extends BaseRestHandler { } builder.endObject(); RestStatus status = OK; - if (response.getVersion() == 1) { + if (response.isCreated()) { status = CREATED; } channel.sendResponse(new XContentRestResponse(request, status, builder)); diff --git a/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java b/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java index b154fe8eec1..30c8a106b1e 100644 --- a/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java +++ b/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java @@ -149,7 +149,7 @@ public class RestUpdateAction extends BaseRestHandler { } builder.endObject(); RestStatus status = OK; - if (response.getVersion() == 1) { + if (response.isCreated()) { status = CREATED; } channel.sendResponse(new XContentRestResponse(request, status, builder)); diff --git a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 7d31e9af955..616d650d37a 100644 --- a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -18,20 +18,22 @@ */ package org.elasticsearch.test.hamcrest; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; - -import java.util.Arrays; - import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Query; import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.suggest.Suggest; import org.hamcrest.Matcher; +import java.util.Arrays; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + /** * */ @@ -128,4 +130,22 @@ public class ElasticsearchAssertions { return (T)q.getClauses()[i].getQuery(); } + public static void assertThrows(ActionFuture future, Class exceptionClass) { + boolean fail=false; + try { + future.actionGet(); + fail=true; + + } + catch (ElasticSearchException esException) { + assertThat(esException.unwrapCause(), instanceOf(exceptionClass)); + } + catch (Throwable e) { + assertThat(e, instanceOf(exceptionClass)); + } + // has to be outside catch clause to get a proper message + if (fail) + throw new AssertionError("Expected a " + exceptionClass + " exception to be thrown"); + } + } diff --git a/src/test/java/org/elasticsearch/test/integration/AbstractSharedClusterTest.java b/src/test/java/org/elasticsearch/test/integration/AbstractSharedClusterTest.java index 89462d19d93..9e10229b078 100644 --- a/src/test/java/org/elasticsearch/test/integration/AbstractSharedClusterTest.java +++ b/src/test/java/org/elasticsearch/test/integration/AbstractSharedClusterTest.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRespon import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder; import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import org.elasticsearch.client.AdminClient; @@ -314,8 +315,12 @@ public abstract class AbstractSharedClusterTest extends ElasticsearchTestCase { } // utils - protected void index(String index, String type, XContentBuilder source) { - client().prepareIndex(index, type).setSource(source).execute().actionGet(); + protected IndexResponse index(String index, String type, XContentBuilder source) { + return client().prepareIndex(index, type).setSource(source).execute().actionGet(); + } + + protected IndexResponse index(String index, String type, String id, String field, Object value) { + return client().prepareIndex(index, type, id).setSource(field, value).execute().actionGet(); } protected RefreshResponse refresh() { diff --git a/src/test/java/org/elasticsearch/test/integration/indexing/IndexActionTests.java b/src/test/java/org/elasticsearch/test/integration/indexing/IndexActionTests.java new file mode 100644 index 00000000000..da1d8b8df95 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/indexing/IndexActionTests.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.indexing; + +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.test.integration.AbstractSharedClusterTest; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicIntegerArray; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +/** + * + */ +public class IndexActionTests extends AbstractSharedClusterTest { + + @Test + public void testCreatedFlag() throws Exception { + createIndex("test"); ensureGreen(); + + IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); + assertTrue(indexResponse.isCreated()); + + indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").execute().actionGet(); + assertFalse(indexResponse.isCreated()); + + client().prepareDelete("test", "type", "1").execute().actionGet(); + + indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").execute().actionGet(); + assertTrue(indexResponse.isCreated()); + + } + + @Test + public void testCreatedFlagWithFlush() throws Exception { + createIndex("test"); ensureGreen(); + + IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); + assertTrue(indexResponse.isCreated()); + + client().prepareDelete("test", "type", "1").execute().actionGet(); + + flush(); + + indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").execute().actionGet(); + assertTrue(indexResponse.isCreated()); + } + + @Test + public void testCreatedFlagParallelExecution() throws Exception { + createIndex("test"); ensureGreen(); + + int threadCount = 20; + final int docCount = 300; + int taskCount = docCount * threadCount; + + final AtomicIntegerArray createdCounts = new AtomicIntegerArray(docCount); + ExecutorService threadPool = Executors.newFixedThreadPool(threadCount); + List> tasks = new ArrayList>(taskCount); + final Random random = new Random(); + for (int i=0;i< taskCount; i++ ) { + tasks.add(new Callable() { + @Override + public Void call() throws Exception { + int docId = random.nextInt(docCount); + IndexResponse indexResponse = index("test", "type", Integer.toString(docId), "field1", "value"); + if (indexResponse.isCreated()) createdCounts.incrementAndGet(docId); + return null; + } + }); + } + + threadPool.invokeAll(tasks); + + for (int i=0;i creating index test"); + client().admin().indices().prepareCreate("test") .addMapping("type1", XContentFactory.jsonBuilder() .startObject() @@ -145,20 +147,23 @@ public class UpdateTests extends AbstractSharedClusterTest { assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - client().prepareUpdate("test", "type1", "1") + UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1") .setUpsertRequest(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject()) .setScript("ctx._source.field += 1") .execute().actionGet(); + assertTrue(updateResponse.isCreated()); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("1")); } - client().prepareUpdate("test", "type1", "1") + updateResponse = client().prepareUpdate("test", "type1", "1") .setUpsertRequest(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject()) .setScript("ctx._source.field += 1") .execute().actionGet(); + assertFalse(updateResponse.isCreated()); + for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); @@ -231,6 +236,7 @@ public class UpdateTests extends AbstractSharedClusterTest { UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx._source.field += 1").execute().actionGet(); assertThat(updateResponse.getVersion(), equalTo(2L)); + assertFalse(updateResponse.isCreated()); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); @@ -239,6 +245,7 @@ public class UpdateTests extends AbstractSharedClusterTest { updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx._source.field += count").addScriptParam("count", 3).execute().actionGet(); assertThat(updateResponse.getVersion(), equalTo(3L)); + assertFalse(updateResponse.isCreated()); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); @@ -248,6 +255,7 @@ public class UpdateTests extends AbstractSharedClusterTest { // check noop updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx.op = 'none'").execute().actionGet(); assertThat(updateResponse.getVersion(), equalTo(3L)); + assertFalse(updateResponse.isCreated()); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); @@ -257,6 +265,7 @@ public class UpdateTests extends AbstractSharedClusterTest { // check delete updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx.op = 'delete'").execute().actionGet(); assertThat(updateResponse.getVersion(), equalTo(4L)); + assertFalse(updateResponse.isCreated()); for (int i = 0; i < 5; i++) { GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/test/integration/versioning/SimpleVersioningTests.java b/src/test/java/org/elasticsearch/test/integration/versioning/SimpleVersioningTests.java index 753a3d14415..c68704bd94a 100644 --- a/src/test/java/org/elasticsearch/test/integration/versioning/SimpleVersioningTests.java +++ b/src/test/java/org/elasticsearch/test/integration/versioning/SimpleVersioningTests.java @@ -19,56 +19,51 @@ package org.elasticsearch.test.integration.versioning; -import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.testng.annotations.Test; +import java.util.HashMap; + import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; /** * */ public class SimpleVersioningTests extends AbstractSharedClusterTest { - @Test public void testExternalVersioningInitialDelete() throws Exception { - client().admin().indices().prepareDelete().execute().actionGet(); - - client().admin().indices().prepareCreate("test").execute().actionGet(); - client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet(); + createIndex("test"); ensureGreen(); + // Note - external version doesn't throw version conflicts on deletes of non existent records. This is different from internal versioning DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(17).setVersionType(VersionType.EXTERNAL).execute().actionGet(); assertThat(deleteResponse.isNotFound(), equalTo(true)); - try { - client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.EXTERNAL).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + // this should conflict with the delete command transaction which told us that the object was deleted at version 17. + assertThrows( + client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.EXTERNAL).execute(), + VersionConflictEngineException.class + ); - client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(18).setVersionType(VersionType.EXTERNAL).execute().actionGet(); + IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(18). + setVersionType(VersionType.EXTERNAL).execute().actionGet(); + assertThat(indexResponse.getVersion(), equalTo(18L)); } @Test public void testExternalVersioning() throws Exception { - try { - client().admin().indices().prepareDelete("test").execute().actionGet(); - } catch (IndexMissingException e) { - // its ok - } - client().admin().indices().prepareCreate("test").execute().actionGet(); - client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet(); + createIndex("test"); ensureGreen(); IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(12).setVersionType(VersionType.EXTERNAL).execute().actionGet(); assertThat(indexResponse.getVersion(), equalTo(12l)); @@ -76,41 +71,73 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest { indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(14).setVersionType(VersionType.EXTERNAL).execute().actionGet(); assertThat(indexResponse.getVersion(), equalTo(14l)); - try { - client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.EXTERNAL).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.EXTERNAL).execute(), + VersionConflictEngineException.class); client().admin().indices().prepareRefresh().execute().actionGet(); for (int i = 0; i < 10; i++) { assertThat(client().prepareGet("test", "type", "1").execute().actionGet().getVersion(), equalTo(14l)); } + // deleting with a lower version fails. + assertThrows( + client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL).execute(), + VersionConflictEngineException.class); + + // Delete with a higher version deletes all versions up to the given one. DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(17).setVersionType(VersionType.EXTERNAL).execute().actionGet(); assertThat(deleteResponse.isNotFound(), equalTo(false)); assertThat(deleteResponse.getVersion(), equalTo(17l)); - try { - client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + // Deleting with a lower version keeps on failing after a delete. + assertThrows( + client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL).execute(), + VersionConflictEngineException.class); + + // But delete with a higher version is OK. deleteResponse = client().prepareDelete("test", "type", "1").setVersion(18).setVersionType(VersionType.EXTERNAL).execute().actionGet(); assertThat(deleteResponse.isNotFound(), equalTo(true)); assertThat(deleteResponse.getVersion(), equalTo(18l)); + + + // TODO: This behavior breaks rest api returning http status 201, good news is that it this is only the case until deletes GC kicks in. + indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(19).setVersionType(VersionType.EXTERNAL).execute().actionGet(); + assertThat(indexResponse.getVersion(), equalTo(19l)); + + + deleteResponse = client().prepareDelete("test", "type", "1").setVersion(20).setVersionType(VersionType.EXTERNAL).execute().actionGet(); + assertThat(deleteResponse.isNotFound(), equalTo(false)); + assertThat(deleteResponse.getVersion(), equalTo(20l)); + + // Make sure that the next delete will be GC. Note we do it on the index settings so it will be cleaned up + HashMap newSettings = new HashMap(); + newSettings.put("index.gc_deletes",-1); + client().admin().indices().prepareUpdateSettings("test").setSettings(newSettings).execute().actionGet(); + + Thread.sleep(300); // gc works based on estimated sampled time. Give it a chance... + + // And now we have previous version return -1 + indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(20).setVersionType(VersionType.EXTERNAL).execute().actionGet(); + assertThat(indexResponse.getVersion(), equalTo(20l)); } @Test - public void testSimpleVersioning() throws Exception { - try { - client().admin().indices().prepareDelete("test").execute().actionGet(); - } catch (IndexMissingException e) { - // its ok - } - client().admin().indices().prepareCreate("test").execute().actionGet(); - client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet(); + public void testInternalVersioningInitialDelete() throws Exception { + createIndex("test"); ensureGreen(); + + assertThrows(client().prepareDelete("test", "type", "1").setVersion(17).execute(), + VersionConflictEngineException.class); + + IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1") + .setCreate(true).execute().actionGet(); + assertThat(indexResponse.getVersion(), equalTo(1l)); + } + + + @Test + public void testInternalVersioning() throws Exception { + createIndex("test"); ensureGreen(); IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); assertThat(indexResponse.getVersion(), equalTo(1l)); @@ -118,41 +145,31 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest { indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet(); assertThat(indexResponse.getVersion(), equalTo(2l)); - try { - client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + assertThrows( + client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), + VersionConflictEngineException.class); - try { - client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + assertThrows( + client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), + VersionConflictEngineException.class); - try { - client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + assertThrows( + client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(), + VersionConflictEngineException.class); + assertThrows( + client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(), + VersionConflictEngineException.class); - try { - client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + assertThrows( + client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(2).execute(), + DocumentAlreadyExistsException.class); + assertThrows( + client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(2).execute(), + DocumentAlreadyExistsException.class); - try { - client().prepareDelete("test", "type", "1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } - try { - client().prepareDelete("test", "type", "1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); + assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); client().admin().indices().prepareRefresh().execute().actionGet(); for (int i = 0; i < 10; i++) { @@ -168,19 +185,18 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest { // search without versioning for (int i = 0; i < 10; i++) { SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).execute().actionGet(); - assertThat(searchResponse.getHits().getAt(0).version(), equalTo(-1l)); + assertThat(searchResponse.getHits().getAt(0).version(), equalTo(Versions.NOT_FOUND)); } DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(2).execute().actionGet(); assertThat(deleteResponse.isNotFound(), equalTo(false)); assertThat(deleteResponse.getVersion(), equalTo(3l)); - try { - client().prepareDelete("test", "type", "1").setVersion(2).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + assertThrows(client().prepareDelete("test", "type", "1").setVersion(2).execute(), VersionConflictEngineException.class); + + // This is intricate - the object was deleted but a delete transaction was with the right version. We add another one + // and thus the transcation is increased. deleteResponse = client().prepareDelete("test", "type", "1").setVersion(3).execute().actionGet(); assertThat(deleteResponse.isNotFound(), equalTo(true)); assertThat(deleteResponse.getVersion(), equalTo(4l)); @@ -188,13 +204,7 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest { @Test public void testSimpleVersioningWithFlush() throws Exception { - try { - client().admin().indices().prepareDelete("test").execute().actionGet(); - } catch (IndexMissingException e) { - // its ok - } - client().admin().indices().prepareCreate("test").execute().actionGet(); - client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet(); + createIndex("test"); ensureGreen(); IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); assertThat(indexResponse.getVersion(), equalTo(1l)); @@ -206,41 +216,19 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest { client().admin().indices().prepareFlush().execute().actionGet(); - try { - client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), + VersionConflictEngineException.class); - try { - client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), + VersionConflictEngineException.class); + assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(), + VersionConflictEngineException.class); - try { - client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(), + VersionConflictEngineException.class); - try { - client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } - - try { - client().prepareDelete("test", "type", "1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } - - try { - client().prepareDelete("test", "type", "1").setVersion(1).execute().actionGet(); - } catch (ElasticSearchException e) { - assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); - } + assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); + assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); client().admin().indices().prepareRefresh().execute().actionGet(); for (int i = 0; i < 10; i++) { @@ -255,13 +243,7 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest { @Test public void testVersioningWithBulk() { - try { - client().admin().indices().prepareDelete("test").execute().actionGet(); - } catch (IndexMissingException e) { - // its ok - } - client().admin().indices().prepareCreate("test").execute().actionGet(); - client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet(); + createIndex("test"); ensureGreen(); BulkResponse bulkResponse = client().prepareBulk().add(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1")).execute().actionGet(); assertThat(bulkResponse.hasFailures(), equalTo(false)); diff --git a/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java b/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java index 7092943a8a1..571b0b8e4a0 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java @@ -30,6 +30,7 @@ import org.apache.lucene.search.TermQuery; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; @@ -72,6 +73,8 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_ import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; /** * @@ -974,6 +977,41 @@ public abstract class AbstractSimpleEngineTests { } } + + @Test + public void testBasicCreatedFlag() { + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); + Engine.Index index = new Engine.Index(null, newUid("1"), doc); + engine.index(index); + assertTrue(index.created()); + + index = new Engine.Index(null, newUid("1"), doc); + engine.index(index); + assertFalse(index.created()); + + engine.delete(new Engine.Delete(null, "1", newUid("1"))); + + index = new Engine.Index(null, newUid("1"), doc); + engine.index(index); + assertTrue(index.created()); + } + + @Test + public void testCreatedFlagAfterFlush() { + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); + Engine.Index index = new Engine.Index(null, newUid("1"), doc); + engine.index(index); + assertTrue(index.created()); + + engine.delete(new Engine.Delete(null, "1", newUid("1"))); + + engine.flush(new Engine.Flush()); + + index = new Engine.Index(null, newUid("1"), doc); + engine.index(index); + assertTrue(index.created()); + } + protected Term newUid(String id) { return new Term("_uid", id); }