From b7325d005ba97527a69f8cd927baa5ed40f25015 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 25 Apr 2014 09:32:40 +0200 Subject: [PATCH] Make Create/Update/Delete classes less mutable Today we use a builder pattern / setters to set relevant information to Engine#Delete|Create|Index. Yet almost all the values are required but they are not passed via ctor arguments but via an error prone builder pattern. If we add a required argument we should see compile errors on that level to make sure we don't miss any place to set them. Prerequisite for #5917 --- .../action/bulk/TransportShardBulkAction.java | 19 +- .../action/delete/TransportDeleteAction.java | 7 +- .../index/TransportShardDeleteAction.java | 8 +- .../TransportShardDeleteByQueryAction.java | 6 +- .../action/index/TransportIndexAction.java | 20 +- .../elasticsearch/index/engine/Engine.java | 413 ++++++------------ .../index/engine/internal/InternalEngine.java | 10 +- .../index/shard/service/IndexShard.java | 9 +- .../shard/service/InternalIndexShard.java | 37 +- .../engine/internal/InternalEngineTests.java | 90 ++-- 10 files changed, 229 insertions(+), 390 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 6024cac5efd..8cb4f8ff3f8 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -410,7 +410,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation Engine.IndexingOperation op; try { if (indexRequest.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); + Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY); if (index.parsedDoc().mappingsModified()) { mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); } @@ -419,7 +419,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation op = index; created = index.created(); } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); + Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY); if (create.parsedDoc().mappingsModified()) { mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); } @@ -443,7 +443,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, IndexShard indexShard) { - Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); + Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.PRIMARY); indexShard.delete(delete); // update the request with the version so it will go to the replicas deleteRequest.versionType(delete.versionType().versionTypeForReplicationAndRecovery()); @@ -561,14 +561,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); if (indexRequest.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse) - .version(indexRequest.version()).versionType(indexRequest.versionType()) - .origin(Engine.Operation.Origin.REPLICA); + Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA); indexShard.index(index); } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse) - .version(indexRequest.version()).versionType(indexRequest.versionType()) - .origin(Engine.Operation.Origin.REPLICA); + Engine.Create create = indexShard.prepareCreate(sourceToParse, + indexRequest.version(), indexRequest.versionType(), + Engine.Operation.Origin.REPLICA); indexShard.create(create); } } catch (Throwable e) { @@ -577,8 +575,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } else if (item.request() instanceof DeleteRequest) { DeleteRequest deleteRequest = (DeleteRequest) item.request(); try { - Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()) - .versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.REPLICA); + Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.REPLICA); indexShard.delete(delete); } catch (Throwable e) { // ignore, we are on backup diff --git a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 3409b1f2a2e..b05076334d6 100644 --- a/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -185,9 +185,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct protected PrimaryResponse shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { DeleteRequest request = shardRequest.request; IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); - Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()) - .versionType(request.versionType()) - .origin(Engine.Operation.Origin.PRIMARY); + Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY); indexShard.delete(delete); // update the request with teh version so it will go to the replicas request.versionType(delete.versionType().versionTypeForReplicationAndRecovery()); @@ -211,8 +209,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { DeleteRequest request = shardRequest.request; IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); - Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()).versionType(request.versionType()) - .origin(Engine.Operation.Origin.REPLICA); + Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA); indexShard.delete(delete); diff --git a/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java b/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java index 625496b6b7e..b86cf18150d 100644 --- a/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java +++ b/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java @@ -93,8 +93,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati protected PrimaryResponse shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { ShardDeleteRequest request = shardRequest.request; IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); - Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()) - .origin(Engine.Operation.Origin.PRIMARY); + Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY); indexShard.delete(delete); // update the version to happen on the replicas request.version(delete.version()); @@ -116,11 +115,10 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { ShardDeleteRequest request = shardRequest.request; IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); - Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()) - .origin(Engine.Operation.Origin.REPLICA); + Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.REPLICA); // IndexDeleteAction doesn't support version type at the moment. Hard coded for the INTERNAL version - delete.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()); + delete = new Engine.Delete(delete, VersionType.INTERNAL.versionTypeForReplicationAndRecovery()); assert delete.versionType().validateVersion(delete.version()); diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java index f4725bfe572..d029f705745 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java @@ -119,8 +119,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays)); try { - Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types()) - .origin(Engine.Operation.Origin.PRIMARY); + Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.PRIMARY, request.types()); SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.of())); indexShard.deleteByQuery(deleteByQuery); } finally { @@ -142,8 +141,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays)); try { - Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types()) - .origin(Engine.Operation.Origin.REPLICA); + Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.REPLICA, request.types()); SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.of())); indexShard.deleteByQuery(deleteByQuery); } finally { diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 7436a43b551..a0bf4933e0f 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -191,10 +191,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi boolean created; Engine.IndexingOperation op; if (request.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse) - .version(request.version()) - .versionType(request.versionType()) - .origin(Engine.Operation.Origin.PRIMARY); + Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY); if (index.parsedDoc().mappingsModified()) { mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false); } @@ -203,10 +200,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi op = index; created = index.created(); } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse) - .version(request.version()) - .versionType(request.versionType()) - .origin(Engine.Operation.Origin.PRIMARY); + Engine.Create create = indexShard.prepareCreate(sourceToParse, + request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY); if (create.parsedDoc().mappingsModified()) { mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false); } @@ -240,14 +235,11 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); if (request.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse) - .version(request.version()).versionType(request.versionType()) - .origin(Engine.Operation.Origin.REPLICA); + Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA); indexShard.index(index); } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse) - .version(request.version()).versionType(request.versionType()) - .origin(Engine.Operation.Origin.REPLICA); + Engine.Create create = indexShard.prepareCreate(sourceToParse, + request.version(), request.versionType(), Engine.Operation.Origin.REPLICA); indexShard.create(create); } if (request.refresh()) { diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index ea88130a699..8a8128f6af1 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -376,163 +376,142 @@ public interface Engine extends IndexShardComponent, CloseableComponent { Origin origin(); } - static interface IndexingOperation extends Operation { + static abstract class IndexingOperation implements Operation { - ParsedDocument parsedDoc(); - - List docs(); - - DocumentMapper docMapper(); - } - - static class Create implements IndexingOperation { private final DocumentMapper docMapper; private final Term uid; private final ParsedDocument doc; - private long version = Versions.MATCH_ANY; - private VersionType versionType = VersionType.INTERNAL; - private Origin origin = Origin.PRIMARY; + private long version; + private final VersionType versionType; + private final Origin origin; - private long startTime; + private final long startTime; private long endTime; - public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc) { + public IndexingOperation(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) { this.docMapper = docMapper; this.uid = uid; this.doc = doc; + this.version = version; + this.versionType = versionType; + this.origin = origin; + this.startTime = startTime; + } + + public IndexingOperation(DocumentMapper docMapper, Term uid, ParsedDocument doc) { + this(docMapper, uid, doc, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime()); + } + + public DocumentMapper docMapper() { + return this.docMapper; } @Override - public DocumentMapper docMapper() { - return this.docMapper; + public Origin origin() { + return this.origin; + } + + public ParsedDocument parsedDoc() { + return this.doc; + } + + public Term uid() { + return this.uid; + } + + public String type() { + return this.doc.type(); + } + + public String id() { + return this.doc.id(); + } + + public String routing() { + return this.doc.routing(); + } + + public long timestamp() { + return this.doc.timestamp(); + } + + public long ttl() { + return this.doc.ttl(); + } + + public long version() { + return this.version; + } + + public void updateVersion(long version) { + this.version = version; + this.doc.version().setLongValue(version); + } + + public VersionType versionType() { + return this.versionType; + } + + public String parent() { + return this.doc.parent(); + } + + public List docs() { + return this.doc.docs(); + } + + public Analyzer analyzer() { + return this.doc.analyzer(); + } + + public BytesReference source() { + return this.doc.source(); + } + + /** + * Returns operation start time in nanoseconds. + */ + public long startTime() { + return this.startTime; + } + + public void endTime(long endTime) { + this.endTime = endTime; + } + + /** + * Returns operation end time in nanoseconds. + */ + public long endTime() { + return this.endTime; + } + } + + static final class Create extends IndexingOperation { + + public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) { + super(docMapper, uid, doc, version, versionType, origin, startTime); + } + + public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc) { + super(docMapper, uid, doc); } @Override public Type opType() { return Type.CREATE; } - - public Create origin(Origin origin) { - this.origin = origin; - return this; - } - - @Override - public Origin origin() { - return this.origin; - } - - @Override - public ParsedDocument parsedDoc() { - return this.doc; - } - - public Term uid() { - return this.uid; - } - - public String type() { - return this.doc.type(); - } - - public String id() { - return this.doc.id(); - } - - public String routing() { - return this.doc.routing(); - } - - public long timestamp() { - return this.doc.timestamp(); - } - - public long ttl() { - return this.doc.ttl(); - } - - public long version() { - return this.version; - } - - public Create version(long version) { - this.version = version; - this.doc.version().setLongValue(version); - return this; - } - - public VersionType versionType() { - return this.versionType; - } - - public Create versionType(VersionType versionType) { - this.versionType = versionType; - return this; - } - - public String parent() { - return this.doc.parent(); - } - - @Override - public List docs() { - return this.doc.docs(); - } - - public Analyzer analyzer() { - return this.doc.analyzer(); - } - - public BytesReference source() { - return this.doc.source(); - } - - public Create startTime(long startTime) { - this.startTime = startTime; - return this; - } - - /** - * Returns operation start time in nanoseconds. - */ - public long startTime() { - return this.startTime; - } - - public Create endTime(long endTime) { - this.endTime = endTime; - return this; - } - - /** - * Returns operation end time in nanoseconds. - */ - public long endTime() { - return this.endTime; - } } - static class Index implements IndexingOperation { - private final DocumentMapper docMapper; - private final Term uid; - private final ParsedDocument doc; - private long version = Versions.MATCH_ANY; - private VersionType versionType = VersionType.INTERNAL; - private Origin origin = Origin.PRIMARY; + static final class Index extends IndexingOperation { private boolean created; - private long startTime; - private long endTime; - - public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc) { - this.docMapper = docMapper; - this.uid = uid; - this.doc = doc; + public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) { + super(docMapper, uid, doc, version, versionType, origin, startTime); } - @Override - public DocumentMapper docMapper() { - return this.docMapper; + public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc) { + super(docMapper, uid, doc); } @Override @@ -540,108 +519,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return Type.INDEX; } - public Index origin(Origin origin) { - this.origin = origin; - return this; - } - - @Override - public Origin origin() { - return this.origin; - } - - public Term uid() { - return this.uid; - } - - @Override - public ParsedDocument parsedDoc() { - return this.doc; - } - - public Index version(long version) { - this.version = version; - doc.version().setLongValue(version); - return this; - } - - /** - * before indexing holds the version requested, after indexing holds the new version of the document. - */ - public long version() { - return this.version; - } - - public Index versionType(VersionType versionType) { - this.versionType = versionType; - return this; - } - - public VersionType versionType() { - return this.versionType; - } - - @Override - public List docs() { - return this.doc.docs(); - } - - public Analyzer analyzer() { - return this.doc.analyzer(); - } - - public String id() { - return this.doc.id(); - } - - public String type() { - return this.doc.type(); - } - - public String routing() { - return this.doc.routing(); - } - - public String parent() { - return this.doc.parent(); - } - - public long timestamp() { - return this.doc.timestamp(); - } - - public long ttl() { - return this.doc.ttl(); - } - - public BytesReference source() { - return this.doc.source(); - } - - public Index startTime(long startTime) { - this.startTime = startTime; - return this; - } - - /** - * Returns operation start time in nanoseconds. - */ - public long startTime() { - return this.startTime; - } - - public Index endTime(long endTime) { - this.endTime = endTime; - return this; - } - - /** - * Returns operation end time in nanoseconds. - */ - public long endTime() { - return this.endTime; - } - /** * @return true if object was created */ @@ -658,18 +535,31 @@ public interface Engine extends IndexShardComponent, CloseableComponent { private final String type; private final String id; private final Term uid; - private long version = Versions.MATCH_ANY; - private VersionType versionType = VersionType.INTERNAL; - private Origin origin = Origin.PRIMARY; + private long version; + private final VersionType versionType; + private final Origin origin; private boolean found; - private long startTime; + private final long startTime; private long endTime; - public Delete(String type, String id, Term uid) { + public Delete(String type, String id, Term uid, long version, VersionType versionType, Origin origin, long startTime, boolean found) { this.type = type; this.id = id; this.uid = uid; + this.version = version; + this.versionType = versionType; + this.origin = origin; + this.startTime = startTime; + this.found = found; + } + + public Delete(String type, String id, Term uid) { + this(type, id, uid, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), false); + } + + public Delete(Delete template, VersionType versionType) { + this(template.type(), template.id(), template.uid(), template.version(), versionType, template.origin(), template.startTime(), template.found()); } @Override @@ -677,11 +567,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return Type.DELETE; } - public Delete origin(Origin origin) { - this.origin = origin; - return this; - } - @Override public Origin origin() { return this.origin; @@ -699,9 +584,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this.uid; } - public Delete version(long version) { + public void updateVersion(long version, boolean found) { this.version = version; - return this; + this.found = found; } /** @@ -711,11 +596,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this.version; } - public Delete versionType(VersionType versionType) { - this.versionType = versionType; - return this; - } - public VersionType versionType() { return this.versionType; } @@ -724,16 +604,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this.found; } - public Delete found(boolean found) { - this.found = found; - return this; - } - - public Delete startTime(long startTime) { - this.startTime = startTime; - return this; - } - /** * Returns operation start time in nanoseconds. */ @@ -741,9 +611,8 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this.startTime; } - public Delete endTime(long endTime) { + public void endTime(long endTime) { this.endTime = endTime; - return this; } /** @@ -761,18 +630,20 @@ public interface Engine extends IndexShardComponent, CloseableComponent { private final Filter aliasFilter; private final String[] types; private final Filter parentFilter; - private Operation.Origin origin = Operation.Origin.PRIMARY; + private final Operation.Origin origin; - private long startTime; + private final long startTime; private long endTime; - public DeleteByQuery(Query query, BytesReference source, @Nullable String[] filteringAliases, @Nullable Filter aliasFilter, Filter parentFilter, String... types) { + public DeleteByQuery(Query query, BytesReference source, @Nullable String[] filteringAliases, @Nullable Filter aliasFilter, Filter parentFilter, Operation.Origin origin, long startTime, String... types) { this.query = query; this.source = source; this.types = types; this.filteringAliases = filteringAliases; this.aliasFilter = aliasFilter; this.parentFilter = parentFilter; + this.startTime = startTime; + this.origin = origin; } public Query query() { @@ -803,20 +674,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return parentFilter; } - public DeleteByQuery origin(Operation.Origin origin) { - this.origin = origin; - return this; - } - public Operation.Origin origin() { return this.origin; } - public DeleteByQuery startTime(long startTime) { - this.startTime = startTime; - return this; - } - /** * Returns operation start time in nanoseconds. */ diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index e256571c3c0..bacb1bfbede 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -435,7 +435,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin } } - create.version(updatedVersion); + create.updateVersion(updatedVersion); if (create.docs().size() > 1) { writer.addDocuments(create.docs(), create.analyzer()); @@ -495,7 +495,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); - index.version(updatedVersion); + index.updateVersion(updatedVersion); if (currentVersion == Versions.NOT_FOUND) { // document does not exists, we can optimize for create index.created(true); @@ -567,16 +567,16 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin if (currentVersion == Versions.NOT_FOUND) { // doc does not exists and no prior deletes - delete.version(updatedVersion).found(false); + delete.updateVersion(updatedVersion, false); Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation)); } else if (versionValue != null && versionValue.delete()) { // a "delete on delete", in this case, we still increment the version, log it, and return that version - delete.version(updatedVersion).found(false); + delete.updateVersion(updatedVersion, false); Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation)); } else { - delete.version(updatedVersion).found(true); + delete.updateVersion(updatedVersion, true); writer.deleteDocuments(delete.uid()); Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation)); diff --git a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index edc35d2a4f6..c880343ff9b 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.filter.FilterCacheStats; import org.elasticsearch.index.cache.filter.ShardFilterCache; import org.elasticsearch.index.cache.id.IdCacheStats; @@ -128,19 +129,19 @@ public interface IndexShard extends IndexShardComponent { IndexShardState state(); - Engine.Create prepareCreate(SourceToParse source) throws ElasticsearchException; + Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException; ParsedDocument create(Engine.Create create) throws ElasticsearchException; - Engine.Index prepareIndex(SourceToParse source) throws ElasticsearchException; + Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException; ParsedDocument index(Engine.Index index) throws ElasticsearchException; - Engine.Delete prepareDelete(String type, String id, long version) throws ElasticsearchException; + Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException; void delete(Engine.Delete delete) throws ElasticsearchException; - Engine.DeleteByQuery prepareDeleteByQuery(BytesReference source, @Nullable String[] filteringAliases, String... types) throws ElasticsearchException; + Engine.DeleteByQuery prepareDeleteByQuery(BytesReference source, @Nullable String[] filteringAliases, Engine.Operation.Origin origin, String... types) throws ElasticsearchException; void deleteByQuery(Engine.DeleteByQuery deleteByQuery) throws ElasticsearchException; diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index e72662e6e9e..3839149a40e 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.lucene.search.XFilteredQuery; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.filter.FilterCacheStats; @@ -366,11 +367,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } @Override - public Engine.Create prepareCreate(SourceToParse source) throws ElasticsearchException { + public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException { long startTime = System.nanoTime(); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type()); ParsedDocument doc = docMapper.parse(source); - return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc).startTime(startTime); + return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime); } @Override @@ -387,11 +388,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } @Override - public Engine.Index prepareIndex(SourceToParse source) throws ElasticsearchException { + public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException { long startTime = System.nanoTime(); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type()); ParsedDocument doc = docMapper.parse(source); - return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc).startTime(startTime); + return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime); } @Override @@ -413,10 +414,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } @Override - public Engine.Delete prepareDelete(String type, String id, long version) throws ElasticsearchException { + public Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException { long startTime = System.nanoTime(); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type); - return new Engine.Delete(type, id, docMapper.uidMapper().term(type, id)).version(version).startTime(startTime); + return new Engine.Delete(type, id, docMapper.uidMapper().term(type, id), version, versionType, origin, startTime, false); } @Override @@ -437,7 +438,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } @Override - public Engine.DeleteByQuery prepareDeleteByQuery(BytesReference source, @Nullable String[] filteringAliases, String... types) throws ElasticsearchException { + public Engine.DeleteByQuery prepareDeleteByQuery(BytesReference source, @Nullable String[] filteringAliases, Engine.Operation.Origin origin, String... types) throws ElasticsearchException { long startTime = System.nanoTime(); if (types == null) { types = Strings.EMPTY_ARRAY; @@ -447,7 +448,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I Filter aliasFilter = indexAliasesService.aliasFilter(filteringAliases); Filter parentFilter = mapperService.hasNested() ? indexCache.filter().cache(NonNestedDocsFilter.INSTANCE) : null; - return new Engine.DeleteByQuery(query, source, filteringAliases, aliasFilter, parentFilter, types).startTime(startTime); + return new Engine.DeleteByQuery(query, source, filteringAliases, aliasFilter, parentFilter, origin, startTime, types); } @Override @@ -743,28 +744,26 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I switch (operation.opType()) { case CREATE: Translog.Create create = (Translog.Create) operation; - engine.create(prepareCreate(source(create.source()).type(create.type()).id(create.id()) - .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl())) - .version(create.version()).versionType(create.versionType().versionTypeForReplicationAndRecovery()) - .origin(Engine.Operation.Origin.RECOVERY)); + engine.create(prepareCreate( + source(create.source()).type(create.type()).id(create.id()) + .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()), + create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY)); break; case SAVE: Translog.Index index = (Translog.Index) operation; engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id()) - .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl())) - .version(index.version()).versionType(index.versionType().versionTypeForReplicationAndRecovery()) - .origin(Engine.Operation.Origin.RECOVERY)); + .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()), + index.version(),index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY)); break; case DELETE: Translog.Delete delete = (Translog.Delete) operation; Uid uid = Uid.createUid(delete.uid().text()); - engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid()) - .version(delete.version()).versionType(delete.versionType().versionTypeForReplicationAndRecovery()) - .origin(Engine.Operation.Origin.RECOVERY)); + engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(), + delete.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, System.nanoTime(), false)); break; case DELETE_BY_QUERY: Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation; - engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types()).origin(Engine.Operation.Origin.RECOVERY)); + engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), Engine.Operation.Origin.RECOVERY, deleteByQuery.types())); break; default: throw new ElasticsearchIllegalStateException("No operation defined for [" + operation + "]"); diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index cbcae6c2827..a2b4033e741 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -83,6 +84,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.hamcrest.Matchers.*; @@ -727,8 +729,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { engine.create(create); assertThat(create.version(), equalTo(1l)); - create = new Engine.Create(null, newUid("1"), doc).version(create.version()) - .versionType(create.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA); + create = new Engine.Create(null, newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0); replicaEngine.create(create); assertThat(create.version(), equalTo(1l)); } @@ -736,12 +737,11 @@ public class InternalEngineTests extends ElasticsearchTestCase { @Test public void testExternalVersioningNewCreate() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); - Engine.Create create = new Engine.Create(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); + Engine.Create create = new Engine.Create(null, newUid("1"), doc, 12, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, 0); engine.create(create); assertThat(create.version(), equalTo(12l)); - create = new Engine.Create(null, newUid("1"), doc).version(create.version()) - .versionType(create.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA); + create = new Engine.Create(null, newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0); replicaEngine.create(create); assertThat(create.version(), equalTo(12l)); } @@ -753,8 +753,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { engine.index(index); assertThat(index.version(), equalTo(1l)); - index = new Engine.Index(null, newUid("1"), doc).version(index.version()) - .versionType(index.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA); + index = new Engine.Index(null, newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0); replicaEngine.index(index); assertThat(index.version(), equalTo(1l)); } @@ -762,12 +761,11 @@ public class InternalEngineTests extends ElasticsearchTestCase { @Test public void testExternalVersioningNewIndex() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); - Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); + Engine.Index index = new Engine.Index(null, newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0); engine.index(index); assertThat(index.version(), equalTo(12l)); - index = new Engine.Index(null, newUid("1"), doc) - .version(index.version()).versionType(index.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA); + index = new Engine.Index(null, newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0); replicaEngine.index(index); assertThat(index.version(), equalTo(12l)); } @@ -783,7 +781,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { engine.index(index); assertThat(index.version(), equalTo(2l)); - index = new Engine.Index(null, newUid("1"), doc).version(1l); + index = new Engine.Index(null, newUid("1"), doc, 1l, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, 0); try { engine.index(index); fail(); @@ -792,7 +790,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { } // future versions should not work as well - index = new Engine.Index(null, newUid("1"), doc).version(3l); + index = new Engine.Index(null, newUid("1"), doc, 3l, VersionType.INTERNAL, PRIMARY, 0); try { engine.index(index); fail(); @@ -804,15 +802,15 @@ public class InternalEngineTests extends ElasticsearchTestCase { @Test public void testExternalVersioningIndexConflict() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); - Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); + Engine.Index index = new Engine.Index(null, newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0); engine.index(index); assertThat(index.version(), equalTo(12l)); - index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(14); + index = new Engine.Index(null, newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0); engine.index(index); assertThat(index.version(), equalTo(14l)); - index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(13l); + index = new Engine.Index(null, newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0); try { engine.index(index); fail(); @@ -834,7 +832,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { engine.flush(new Engine.Flush()); - index = new Engine.Index(null, newUid("1"), doc).version(1l); + index = new Engine.Index(null, newUid("1"), doc, 1l, VersionType.INTERNAL, PRIMARY, 0); try { engine.index(index); fail(); @@ -843,7 +841,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { } // future versions should not work as well - index = new Engine.Index(null, newUid("1"), doc).version(3l); + index = new Engine.Index(null, newUid("1"), doc, 3l, VersionType.INTERNAL, PRIMARY, 0); try { engine.index(index); fail(); @@ -855,17 +853,17 @@ public class InternalEngineTests extends ElasticsearchTestCase { @Test public void testExternalVersioningIndexConflictWithFlush() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); - Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); + Engine.Index index = new Engine.Index(null, newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0); engine.index(index); assertThat(index.version(), equalTo(12l)); - index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(14); + index = new Engine.Index(null, newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0); engine.index(index); assertThat(index.version(), equalTo(14l)); engine.flush(new Engine.Flush()); - index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(13); + index = new Engine.Index(null, newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0); try { engine.index(index); fail(); @@ -885,7 +883,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { engine.index(index); assertThat(index.version(), equalTo(2l)); - Engine.Delete delete = new Engine.Delete("test", "1", newUid("1")).version(1l); + Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1l, VersionType.INTERNAL, PRIMARY, 0, false); try { engine.delete(delete); fail(); @@ -894,7 +892,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { } // future versions should not work as well - delete = new Engine.Delete("test", "1", newUid("1")).version(3l); + delete = new Engine.Delete("test", "1", newUid("1"), 3l, VersionType.INTERNAL, PRIMARY, 0, false); try { engine.delete(delete); fail(); @@ -903,12 +901,12 @@ public class InternalEngineTests extends ElasticsearchTestCase { } // now actually delete - delete = new Engine.Delete("test", "1", newUid("1")).version(2l); + delete = new Engine.Delete("test", "1", newUid("1"), 2l, VersionType.INTERNAL, PRIMARY, 0, false); engine.delete(delete); assertThat(delete.version(), equalTo(3l)); // now check if we can index to a delete doc with version - index = new Engine.Index(null, newUid("1"), doc).version(2l); + index = new Engine.Index(null, newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0); try { engine.index(index); fail(); @@ -917,7 +915,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { } // we shouldn't be able to create as well - Engine.Create create = new Engine.Create(null, newUid("1"), doc).version(2l); + Engine.Create create = new Engine.Create(null, newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0); try { engine.create(create); } catch (VersionConflictEngineException e) { @@ -938,7 +936,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { engine.flush(new Engine.Flush()); - Engine.Delete delete = new Engine.Delete("test", "1", newUid("1")).version(1l); + Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1l, VersionType.INTERNAL, PRIMARY, 0, false); try { engine.delete(delete); fail(); @@ -947,7 +945,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { } // future versions should not work as well - delete = new Engine.Delete("test", "1", newUid("1")).version(3l); + delete = new Engine.Delete("test", "1", newUid("1"), 3l, VersionType.INTERNAL, PRIMARY, 0, false); try { engine.delete(delete); fail(); @@ -958,14 +956,14 @@ public class InternalEngineTests extends ElasticsearchTestCase { engine.flush(new Engine.Flush()); // now actually delete - delete = new Engine.Delete("test", "1", newUid("1")).version(2l); + delete = new Engine.Delete("test", "1", newUid("1"), 2l, VersionType.INTERNAL, PRIMARY, 0, false); engine.delete(delete); assertThat(delete.version(), equalTo(3l)); engine.flush(new Engine.Flush()); // now check if we can index to a delete doc with version - index = new Engine.Index(null, newUid("1"), doc).version(2l); + index = new Engine.Index(null, newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0); try { engine.index(index); fail(); @@ -974,7 +972,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { } // we shouldn't be able to create as well - Engine.Create create = new Engine.Create(null, newUid("1"), doc).version(2l); + Engine.Create create = new Engine.Create(null, newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0); try { engine.create(create); } catch (VersionConflictEngineException e) { @@ -985,11 +983,11 @@ public class InternalEngineTests extends ElasticsearchTestCase { @Test public void testVersioningCreateExistsException() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); - Engine.Create create = new Engine.Create(null, newUid("1"), doc); + Engine.Create create = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0); engine.create(create); assertThat(create.version(), equalTo(1l)); - create = new Engine.Create(null, newUid("1"), doc); + create = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0); try { engine.create(create); fail(); @@ -1001,13 +999,13 @@ public class InternalEngineTests extends ElasticsearchTestCase { @Test public void testVersioningCreateExistsExceptionWithFlush() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); - Engine.Create create = new Engine.Create(null, newUid("1"), doc); + Engine.Create create = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0); engine.create(create); assertThat(create.version(), equalTo(1l)); engine.flush(new Engine.Flush()); - create = new Engine.Create(null, newUid("1"), doc); + create = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0); try { engine.create(create); fail(); @@ -1028,13 +1026,12 @@ public class InternalEngineTests extends ElasticsearchTestCase { assertThat(index.version(), equalTo(2l)); // apply the second index to the replica, should work fine - index = new Engine.Index(null, newUid("1"), doc).version(index.version()) - .versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); + index = new Engine.Index(null, newUid("1"), doc, index.version(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0); replicaEngine.index(index); assertThat(index.version(), equalTo(2l)); // now, the old one should not work - index = new Engine.Index(null, newUid("1"), doc).version(1l).versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); + index = new Engine.Index(null, newUid("1"), doc, 1l, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0); try { replicaEngine.index(index); fail(); @@ -1044,8 +1041,8 @@ public class InternalEngineTests extends ElasticsearchTestCase { // second version on replica should fail as well try { - index = new Engine.Index(null, newUid("1"), doc).version(2l) - .versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); + index = new Engine.Index(null, newUid("1"), doc, 2l + , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0); replicaEngine.index(index); assertThat(index.version(), equalTo(2l)); } catch (VersionConflictEngineException e) { @@ -1061,8 +1058,8 @@ public class InternalEngineTests extends ElasticsearchTestCase { assertThat(index.version(), equalTo(1l)); // apply the first index to the replica, should work fine - index = new Engine.Index(null, newUid("1"), doc).version(1l) - .versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); + index = new Engine.Index(null, newUid("1"), doc, 1l + , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0); replicaEngine.index(index); assertThat(index.version(), equalTo(1l)); @@ -1077,15 +1074,15 @@ public class InternalEngineTests extends ElasticsearchTestCase { assertThat(delete.version(), equalTo(3l)); // apply the delete on the replica (skipping the second index) - delete = new Engine.Delete("test", "1", newUid("1")).version(3l) - .versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); + delete = new Engine.Delete("test", "1", newUid("1"), 3l + , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, false); replicaEngine.delete(delete); assertThat(delete.version(), equalTo(3l)); // second time delete with same version should fail try { - delete = new Engine.Delete("test", "1", newUid("1")).version(3l) - .versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); + delete = new Engine.Delete("test", "1", newUid("1"), 3l + , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, false); replicaEngine.delete(delete); fail("excepted VersionConflictEngineException to be thrown"); } catch (VersionConflictEngineException e) { @@ -1094,8 +1091,7 @@ public class InternalEngineTests extends ElasticsearchTestCase { // now do the second index on the replica, it should fail try { - index = new Engine.Index(null, newUid("1"), doc).version(2l) - .versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); + index = new Engine.Index(null, newUid("1"), doc, 2l, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0); replicaEngine.index(index); fail("excepted VersionConflictEngineException to be thrown"); } catch (VersionConflictEngineException e) {