Make Create/Update/Delete classes less mutable

Today we use a builder pattern / setters to set relevant information
to Engine#Delete|Create|Index. Yet almost all the values are required
but they are not passed via ctor arguments but via an error prone builder
pattern. If we add a required argument we should see compile errors on that
level to make sure we don't miss any place to set them.

Prerequisite for #5917
This commit is contained in:
Simon Willnauer 2014-04-25 09:32:40 +02:00
parent 908c0d4165
commit b7325d005b
10 changed files with 229 additions and 390 deletions

View File

@ -410,7 +410,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
Engine.IndexingOperation op; Engine.IndexingOperation op;
try { try {
if (indexRequest.opType() == IndexRequest.OpType.INDEX) { if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY);
if (index.parsedDoc().mappingsModified()) { if (index.parsedDoc().mappingsModified()) {
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
} }
@ -419,7 +419,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
op = index; op = index;
created = index.created(); created = index.created();
} else { } else {
Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY);
if (create.parsedDoc().mappingsModified()) { if (create.parsedDoc().mappingsModified()) {
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
} }
@ -443,7 +443,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} }
private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, IndexShard indexShard) { private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete); indexShard.delete(delete);
// update the request with the version so it will go to the replicas // update the request with the version so it will go to the replicas
deleteRequest.versionType(delete.versionType().versionTypeForReplicationAndRecovery()); deleteRequest.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
@ -561,14 +561,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
if (indexRequest.opType() == IndexRequest.OpType.INDEX) { if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse) Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA);
.version(indexRequest.version()).versionType(indexRequest.versionType())
.origin(Engine.Operation.Origin.REPLICA);
indexShard.index(index); indexShard.index(index);
} else { } else {
Engine.Create create = indexShard.prepareCreate(sourceToParse) Engine.Create create = indexShard.prepareCreate(sourceToParse,
.version(indexRequest.version()).versionType(indexRequest.versionType()) indexRequest.version(), indexRequest.versionType(),
.origin(Engine.Operation.Origin.REPLICA); Engine.Operation.Origin.REPLICA);
indexShard.create(create); indexShard.create(create);
} }
} catch (Throwable e) { } catch (Throwable e) {
@ -577,8 +575,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else if (item.request() instanceof DeleteRequest) { } else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request(); DeleteRequest deleteRequest = (DeleteRequest) item.request();
try { try {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()) Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.REPLICA);
.versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.REPLICA);
indexShard.delete(delete); indexShard.delete(delete);
} catch (Throwable e) { } catch (Throwable e) {
// ignore, we are on backup // ignore, we are on backup

View File

@ -185,9 +185,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
protected PrimaryResponse<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { protected PrimaryResponse<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request; DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()) Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete); indexShard.delete(delete);
// update the request with teh version so it will go to the replicas // update the request with teh version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery()); request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
@ -211,8 +209,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request; DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()).versionType(request.versionType()) Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
.origin(Engine.Operation.Origin.REPLICA);
indexShard.delete(delete); indexShard.delete(delete);

View File

@ -93,8 +93,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
protected PrimaryResponse<ShardDeleteResponse, ShardDeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { protected PrimaryResponse<ShardDeleteResponse, ShardDeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request; ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()) Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY);
.origin(Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete); indexShard.delete(delete);
// update the version to happen on the replicas // update the version to happen on the replicas
request.version(delete.version()); request.version(delete.version());
@ -116,11 +115,10 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request; ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()) Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.REPLICA);
.origin(Engine.Operation.Origin.REPLICA);
// IndexDeleteAction doesn't support version type at the moment. Hard coded for the INTERNAL version // IndexDeleteAction doesn't support version type at the moment. Hard coded for the INTERNAL version
delete.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()); delete = new Engine.Delete(delete, VersionType.INTERNAL.versionTypeForReplicationAndRecovery());
assert delete.versionType().validateVersion(delete.version()); assert delete.versionType().validateVersion(delete.version());

View File

@ -119,8 +119,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler, indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler,
pageCacheRecycler, bigArrays)); pageCacheRecycler, bigArrays));
try { try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types()) Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.PRIMARY, request.types());
.origin(Engine.Operation.Origin.PRIMARY);
SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of())); SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of()));
indexShard.deleteByQuery(deleteByQuery); indexShard.deleteByQuery(deleteByQuery);
} finally { } finally {
@ -142,8 +141,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService, indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
cacheRecycler, pageCacheRecycler, bigArrays)); cacheRecycler, pageCacheRecycler, bigArrays));
try { try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types()) Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.REPLICA, request.types());
.origin(Engine.Operation.Origin.REPLICA);
SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of())); SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of()));
indexShard.deleteByQuery(deleteByQuery); indexShard.deleteByQuery(deleteByQuery);
} finally { } finally {

View File

@ -191,10 +191,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
boolean created; boolean created;
Engine.IndexingOperation op; Engine.IndexingOperation op;
if (request.opType() == IndexRequest.OpType.INDEX) { if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse) Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
.version(request.version())
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
if (index.parsedDoc().mappingsModified()) { if (index.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false); mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
} }
@ -203,10 +200,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
op = index; op = index;
created = index.created(); created = index.created();
} else { } else {
Engine.Create create = indexShard.prepareCreate(sourceToParse) Engine.Create create = indexShard.prepareCreate(sourceToParse,
.version(request.version()) request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
if (create.parsedDoc().mappingsModified()) { if (create.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false); mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
} }
@ -240,14 +235,11 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id()) SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) { if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse) Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
.version(request.version()).versionType(request.versionType())
.origin(Engine.Operation.Origin.REPLICA);
indexShard.index(index); indexShard.index(index);
} else { } else {
Engine.Create create = indexShard.prepareCreate(sourceToParse) Engine.Create create = indexShard.prepareCreate(sourceToParse,
.version(request.version()).versionType(request.versionType()) request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
.origin(Engine.Operation.Origin.REPLICA);
indexShard.create(create); indexShard.create(create);
} }
if (request.refresh()) { if (request.refresh()) {

View File

@ -376,163 +376,142 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
Origin origin(); Origin origin();
} }
static interface IndexingOperation extends Operation { static abstract class IndexingOperation implements Operation {
ParsedDocument parsedDoc();
List<Document> docs();
DocumentMapper docMapper();
}
static class Create implements IndexingOperation {
private final DocumentMapper docMapper; private final DocumentMapper docMapper;
private final Term uid; private final Term uid;
private final ParsedDocument doc; private final ParsedDocument doc;
private long version = Versions.MATCH_ANY; private long version;
private VersionType versionType = VersionType.INTERNAL; private final VersionType versionType;
private Origin origin = Origin.PRIMARY; private final Origin origin;
private long startTime; private final long startTime;
private long endTime; private long endTime;
public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc) { public IndexingOperation(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
this.docMapper = docMapper; this.docMapper = docMapper;
this.uid = uid; this.uid = uid;
this.doc = doc; this.doc = doc;
this.version = version;
this.versionType = versionType;
this.origin = origin;
this.startTime = startTime;
}
public IndexingOperation(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
this(docMapper, uid, doc, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
}
public DocumentMapper docMapper() {
return this.docMapper;
} }
@Override @Override
public DocumentMapper docMapper() { public Origin origin() {
return this.docMapper; return this.origin;
}
public ParsedDocument parsedDoc() {
return this.doc;
}
public Term uid() {
return this.uid;
}
public String type() {
return this.doc.type();
}
public String id() {
return this.doc.id();
}
public String routing() {
return this.doc.routing();
}
public long timestamp() {
return this.doc.timestamp();
}
public long ttl() {
return this.doc.ttl();
}
public long version() {
return this.version;
}
public void updateVersion(long version) {
this.version = version;
this.doc.version().setLongValue(version);
}
public VersionType versionType() {
return this.versionType;
}
public String parent() {
return this.doc.parent();
}
public List<Document> docs() {
return this.doc.docs();
}
public Analyzer analyzer() {
return this.doc.analyzer();
}
public BytesReference source() {
return this.doc.source();
}
/**
* Returns operation start time in nanoseconds.
*/
public long startTime() {
return this.startTime;
}
public void endTime(long endTime) {
this.endTime = endTime;
}
/**
* Returns operation end time in nanoseconds.
*/
public long endTime() {
return this.endTime;
}
}
static final class Create extends IndexingOperation {
public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
super(docMapper, uid, doc, version, versionType, origin, startTime);
}
public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
super(docMapper, uid, doc);
} }
@Override @Override
public Type opType() { public Type opType() {
return Type.CREATE; return Type.CREATE;
} }
public Create origin(Origin origin) {
this.origin = origin;
return this;
}
@Override
public Origin origin() {
return this.origin;
}
@Override
public ParsedDocument parsedDoc() {
return this.doc;
}
public Term uid() {
return this.uid;
}
public String type() {
return this.doc.type();
}
public String id() {
return this.doc.id();
}
public String routing() {
return this.doc.routing();
}
public long timestamp() {
return this.doc.timestamp();
}
public long ttl() {
return this.doc.ttl();
}
public long version() {
return this.version;
}
public Create version(long version) {
this.version = version;
this.doc.version().setLongValue(version);
return this;
}
public VersionType versionType() {
return this.versionType;
}
public Create versionType(VersionType versionType) {
this.versionType = versionType;
return this;
}
public String parent() {
return this.doc.parent();
}
@Override
public List<Document> docs() {
return this.doc.docs();
}
public Analyzer analyzer() {
return this.doc.analyzer();
}
public BytesReference source() {
return this.doc.source();
}
public Create startTime(long startTime) {
this.startTime = startTime;
return this;
}
/**
* Returns operation start time in nanoseconds.
*/
public long startTime() {
return this.startTime;
}
public Create endTime(long endTime) {
this.endTime = endTime;
return this;
}
/**
* Returns operation end time in nanoseconds.
*/
public long endTime() {
return this.endTime;
}
} }
static class Index implements IndexingOperation { static final class Index extends IndexingOperation {
private final DocumentMapper docMapper;
private final Term uid;
private final ParsedDocument doc;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private Origin origin = Origin.PRIMARY;
private boolean created; private boolean created;
private long startTime; public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
private long endTime; super(docMapper, uid, doc, version, versionType, origin, startTime);
public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
this.docMapper = docMapper;
this.uid = uid;
this.doc = doc;
} }
@Override public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
public DocumentMapper docMapper() { super(docMapper, uid, doc);
return this.docMapper;
} }
@Override @Override
@ -540,108 +519,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return Type.INDEX; return Type.INDEX;
} }
public Index origin(Origin origin) {
this.origin = origin;
return this;
}
@Override
public Origin origin() {
return this.origin;
}
public Term uid() {
return this.uid;
}
@Override
public ParsedDocument parsedDoc() {
return this.doc;
}
public Index version(long version) {
this.version = version;
doc.version().setLongValue(version);
return this;
}
/**
* before indexing holds the version requested, after indexing holds the new version of the document.
*/
public long version() {
return this.version;
}
public Index versionType(VersionType versionType) {
this.versionType = versionType;
return this;
}
public VersionType versionType() {
return this.versionType;
}
@Override
public List<Document> docs() {
return this.doc.docs();
}
public Analyzer analyzer() {
return this.doc.analyzer();
}
public String id() {
return this.doc.id();
}
public String type() {
return this.doc.type();
}
public String routing() {
return this.doc.routing();
}
public String parent() {
return this.doc.parent();
}
public long timestamp() {
return this.doc.timestamp();
}
public long ttl() {
return this.doc.ttl();
}
public BytesReference source() {
return this.doc.source();
}
public Index startTime(long startTime) {
this.startTime = startTime;
return this;
}
/**
* Returns operation start time in nanoseconds.
*/
public long startTime() {
return this.startTime;
}
public Index endTime(long endTime) {
this.endTime = endTime;
return this;
}
/**
* Returns operation end time in nanoseconds.
*/
public long endTime() {
return this.endTime;
}
/** /**
* @return true if object was created * @return true if object was created
*/ */
@ -658,18 +535,31 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final String type; private final String type;
private final String id; private final String id;
private final Term uid; private final Term uid;
private long version = Versions.MATCH_ANY; private long version;
private VersionType versionType = VersionType.INTERNAL; private final VersionType versionType;
private Origin origin = Origin.PRIMARY; private final Origin origin;
private boolean found; private boolean found;
private long startTime; private final long startTime;
private long endTime; private long endTime;
public Delete(String type, String id, Term uid) { public Delete(String type, String id, Term uid, long version, VersionType versionType, Origin origin, long startTime, boolean found) {
this.type = type; this.type = type;
this.id = id; this.id = id;
this.uid = uid; this.uid = uid;
this.version = version;
this.versionType = versionType;
this.origin = origin;
this.startTime = startTime;
this.found = found;
}
public Delete(String type, String id, Term uid) {
this(type, id, uid, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), false);
}
public Delete(Delete template, VersionType versionType) {
this(template.type(), template.id(), template.uid(), template.version(), versionType, template.origin(), template.startTime(), template.found());
} }
@Override @Override
@ -677,11 +567,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return Type.DELETE; return Type.DELETE;
} }
public Delete origin(Origin origin) {
this.origin = origin;
return this;
}
@Override @Override
public Origin origin() { public Origin origin() {
return this.origin; return this.origin;
@ -699,9 +584,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.uid; return this.uid;
} }
public Delete version(long version) { public void updateVersion(long version, boolean found) {
this.version = version; this.version = version;
return this; this.found = found;
} }
/** /**
@ -711,11 +596,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.version; return this.version;
} }
public Delete versionType(VersionType versionType) {
this.versionType = versionType;
return this;
}
public VersionType versionType() { public VersionType versionType() {
return this.versionType; return this.versionType;
} }
@ -724,16 +604,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.found; return this.found;
} }
public Delete found(boolean found) {
this.found = found;
return this;
}
public Delete startTime(long startTime) {
this.startTime = startTime;
return this;
}
/** /**
* Returns operation start time in nanoseconds. * Returns operation start time in nanoseconds.
*/ */
@ -741,9 +611,8 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.startTime; return this.startTime;
} }
public Delete endTime(long endTime) { public void endTime(long endTime) {
this.endTime = endTime; this.endTime = endTime;
return this;
} }
/** /**
@ -761,18 +630,20 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final Filter aliasFilter; private final Filter aliasFilter;
private final String[] types; private final String[] types;
private final Filter parentFilter; private final Filter parentFilter;
private Operation.Origin origin = Operation.Origin.PRIMARY; private final Operation.Origin origin;
private long startTime; private final long startTime;
private long endTime; private long endTime;
public DeleteByQuery(Query query, BytesReference source, @Nullable String[] filteringAliases, @Nullable Filter aliasFilter, Filter parentFilter, String... types) { public DeleteByQuery(Query query, BytesReference source, @Nullable String[] filteringAliases, @Nullable Filter aliasFilter, Filter parentFilter, Operation.Origin origin, long startTime, String... types) {
this.query = query; this.query = query;
this.source = source; this.source = source;
this.types = types; this.types = types;
this.filteringAliases = filteringAliases; this.filteringAliases = filteringAliases;
this.aliasFilter = aliasFilter; this.aliasFilter = aliasFilter;
this.parentFilter = parentFilter; this.parentFilter = parentFilter;
this.startTime = startTime;
this.origin = origin;
} }
public Query query() { public Query query() {
@ -803,20 +674,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return parentFilter; return parentFilter;
} }
public DeleteByQuery origin(Operation.Origin origin) {
this.origin = origin;
return this;
}
public Operation.Origin origin() { public Operation.Origin origin() {
return this.origin; return this.origin;
} }
public DeleteByQuery startTime(long startTime) {
this.startTime = startTime;
return this;
}
/** /**
* Returns operation start time in nanoseconds. * Returns operation start time in nanoseconds.
*/ */

View File

@ -435,7 +435,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
} }
} }
create.version(updatedVersion); create.updateVersion(updatedVersion);
if (create.docs().size() > 1) { if (create.docs().size() > 1) {
writer.addDocuments(create.docs(), create.analyzer()); writer.addDocuments(create.docs(), create.analyzer());
@ -495,7 +495,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.version(updatedVersion); index.updateVersion(updatedVersion);
if (currentVersion == Versions.NOT_FOUND) { if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create // document does not exists, we can optimize for create
index.created(true); index.created(true);
@ -567,16 +567,16 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
if (currentVersion == Versions.NOT_FOUND) { if (currentVersion == Versions.NOT_FOUND) {
// doc does not exists and no prior deletes // doc does not exists and no prior deletes
delete.version(updatedVersion).found(false); delete.updateVersion(updatedVersion, false);
Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation)); versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
} else if (versionValue != null && versionValue.delete()) { } else if (versionValue != null && versionValue.delete()) {
// a "delete on delete", in this case, we still increment the version, log it, and return that version // a "delete on delete", in this case, we still increment the version, log it, and return that version
delete.version(updatedVersion).found(false); delete.updateVersion(updatedVersion, false);
Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation)); versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
} else { } else {
delete.version(updatedVersion).found(true); delete.updateVersion(updatedVersion, true);
writer.deleteDocuments(delete.uid()); writer.deleteDocuments(delete.uid());
Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation)); versionMap.put(versionKey, new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.filter.FilterCacheStats; import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.filter.ShardFilterCache; import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.id.IdCacheStats; import org.elasticsearch.index.cache.id.IdCacheStats;
@ -128,19 +129,19 @@ public interface IndexShard extends IndexShardComponent {
IndexShardState state(); IndexShardState state();
Engine.Create prepareCreate(SourceToParse source) throws ElasticsearchException; Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException;
ParsedDocument create(Engine.Create create) throws ElasticsearchException; ParsedDocument create(Engine.Create create) throws ElasticsearchException;
Engine.Index prepareIndex(SourceToParse source) throws ElasticsearchException; Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException;
ParsedDocument index(Engine.Index index) throws ElasticsearchException; ParsedDocument index(Engine.Index index) throws ElasticsearchException;
Engine.Delete prepareDelete(String type, String id, long version) throws ElasticsearchException; Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException;
void delete(Engine.Delete delete) throws ElasticsearchException; void delete(Engine.Delete delete) throws ElasticsearchException;
Engine.DeleteByQuery prepareDeleteByQuery(BytesReference source, @Nullable String[] filteringAliases, String... types) throws ElasticsearchException; Engine.DeleteByQuery prepareDeleteByQuery(BytesReference source, @Nullable String[] filteringAliases, Engine.Operation.Origin origin, String... types) throws ElasticsearchException;
void deleteByQuery(Engine.DeleteByQuery deleteByQuery) throws ElasticsearchException; void deleteByQuery(Engine.DeleteByQuery deleteByQuery) throws ElasticsearchException;

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.lucene.search.XFilteredQuery;
import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.filter.FilterCacheStats; import org.elasticsearch.index.cache.filter.FilterCacheStats;
@ -366,11 +367,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
@Override @Override
public Engine.Create prepareCreate(SourceToParse source) throws ElasticsearchException { public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type()); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.parse(source); ParsedDocument doc = docMapper.parse(source);
return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc).startTime(startTime); return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime);
} }
@Override @Override
@ -387,11 +388,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
@Override @Override
public Engine.Index prepareIndex(SourceToParse source) throws ElasticsearchException { public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type()); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.parse(source); ParsedDocument doc = docMapper.parse(source);
return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc).startTime(startTime); return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime);
} }
@Override @Override
@ -413,10 +414,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
@Override @Override
public Engine.Delete prepareDelete(String type, String id, long version) throws ElasticsearchException { public Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type);
return new Engine.Delete(type, id, docMapper.uidMapper().term(type, id)).version(version).startTime(startTime); return new Engine.Delete(type, id, docMapper.uidMapper().term(type, id), version, versionType, origin, startTime, false);
} }
@Override @Override
@ -437,7 +438,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
@Override @Override
public Engine.DeleteByQuery prepareDeleteByQuery(BytesReference source, @Nullable String[] filteringAliases, String... types) throws ElasticsearchException { public Engine.DeleteByQuery prepareDeleteByQuery(BytesReference source, @Nullable String[] filteringAliases, Engine.Operation.Origin origin, String... types) throws ElasticsearchException {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
if (types == null) { if (types == null) {
types = Strings.EMPTY_ARRAY; types = Strings.EMPTY_ARRAY;
@ -447,7 +448,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
Filter aliasFilter = indexAliasesService.aliasFilter(filteringAliases); Filter aliasFilter = indexAliasesService.aliasFilter(filteringAliases);
Filter parentFilter = mapperService.hasNested() ? indexCache.filter().cache(NonNestedDocsFilter.INSTANCE) : null; Filter parentFilter = mapperService.hasNested() ? indexCache.filter().cache(NonNestedDocsFilter.INSTANCE) : null;
return new Engine.DeleteByQuery(query, source, filteringAliases, aliasFilter, parentFilter, types).startTime(startTime); return new Engine.DeleteByQuery(query, source, filteringAliases, aliasFilter, parentFilter, origin, startTime, types);
} }
@Override @Override
@ -743,28 +744,26 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
switch (operation.opType()) { switch (operation.opType()) {
case CREATE: case CREATE:
Translog.Create create = (Translog.Create) operation; Translog.Create create = (Translog.Create) operation;
engine.create(prepareCreate(source(create.source()).type(create.type()).id(create.id()) engine.create(prepareCreate(
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl())) source(create.source()).type(create.type()).id(create.id())
.version(create.version()).versionType(create.versionType().versionTypeForReplicationAndRecovery()) .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
.origin(Engine.Operation.Origin.RECOVERY)); create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY));
break; break;
case SAVE: case SAVE:
Translog.Index index = (Translog.Index) operation; Translog.Index index = (Translog.Index) operation;
engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id()) engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl())) .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
.version(index.version()).versionType(index.versionType().versionTypeForReplicationAndRecovery()) index.version(),index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY));
.origin(Engine.Operation.Origin.RECOVERY));
break; break;
case DELETE: case DELETE:
Translog.Delete delete = (Translog.Delete) operation; Translog.Delete delete = (Translog.Delete) operation;
Uid uid = Uid.createUid(delete.uid().text()); Uid uid = Uid.createUid(delete.uid().text());
engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid()) engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(),
.version(delete.version()).versionType(delete.versionType().versionTypeForReplicationAndRecovery()) delete.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, System.nanoTime(), false));
.origin(Engine.Operation.Origin.RECOVERY));
break; break;
case DELETE_BY_QUERY: case DELETE_BY_QUERY:
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation; Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation;
engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types()).origin(Engine.Operation.Origin.RECOVERY)); engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), Engine.Operation.Origin.RECOVERY, deleteByQuery.types()));
break; break;
default: default:
throw new ElasticsearchIllegalStateException("No operation defined for [" + operation + "]"); throw new ElasticsearchIllegalStateException("No operation defined for [" + operation + "]");

View File

@ -36,6 +36,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
@ -83,6 +84,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@ -727,8 +729,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.create(create); engine.create(create);
assertThat(create.version(), equalTo(1l)); assertThat(create.version(), equalTo(1l));
create = new Engine.Create(null, newUid("1"), doc).version(create.version()) create = new Engine.Create(null, newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
.versionType(create.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.create(create); replicaEngine.create(create);
assertThat(create.version(), equalTo(1l)); assertThat(create.version(), equalTo(1l));
} }
@ -736,12 +737,11 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@Test @Test
public void testExternalVersioningNewCreate() { public void testExternalVersioningNewCreate() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); Engine.Create create = new Engine.Create(null, newUid("1"), doc, 12, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, 0);
engine.create(create); engine.create(create);
assertThat(create.version(), equalTo(12l)); assertThat(create.version(), equalTo(12l));
create = new Engine.Create(null, newUid("1"), doc).version(create.version()) create = new Engine.Create(null, newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
.versionType(create.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.create(create); replicaEngine.create(create);
assertThat(create.version(), equalTo(12l)); assertThat(create.version(), equalTo(12l));
} }
@ -753,8 +753,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(1l)); assertThat(index.version(), equalTo(1l));
index = new Engine.Index(null, newUid("1"), doc).version(index.version()) index = new Engine.Index(null, newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
.versionType(index.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.index(index); replicaEngine.index(index);
assertThat(index.version(), equalTo(1l)); assertThat(index.version(), equalTo(1l));
} }
@ -762,12 +761,11 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@Test @Test
public void testExternalVersioningNewIndex() { public void testExternalVersioningNewIndex() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); Engine.Index index = new Engine.Index(null, newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(12l)); assertThat(index.version(), equalTo(12l));
index = new Engine.Index(null, newUid("1"), doc) index = new Engine.Index(null, newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
.version(index.version()).versionType(index.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.index(index); replicaEngine.index(index);
assertThat(index.version(), equalTo(12l)); assertThat(index.version(), equalTo(12l));
} }
@ -783,7 +781,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(2l)); assertThat(index.version(), equalTo(2l));
index = new Engine.Index(null, newUid("1"), doc).version(1l); index = new Engine.Index(null, newUid("1"), doc, 1l, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, 0);
try { try {
engine.index(index); engine.index(index);
fail(); fail();
@ -792,7 +790,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
} }
// future versions should not work as well // future versions should not work as well
index = new Engine.Index(null, newUid("1"), doc).version(3l); index = new Engine.Index(null, newUid("1"), doc, 3l, VersionType.INTERNAL, PRIMARY, 0);
try { try {
engine.index(index); engine.index(index);
fail(); fail();
@ -804,15 +802,15 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@Test @Test
public void testExternalVersioningIndexConflict() { public void testExternalVersioningIndexConflict() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); Engine.Index index = new Engine.Index(null, newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(12l)); assertThat(index.version(), equalTo(12l));
index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(14); index = new Engine.Index(null, newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(14l)); assertThat(index.version(), equalTo(14l));
index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(13l); index = new Engine.Index(null, newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0);
try { try {
engine.index(index); engine.index(index);
fail(); fail();
@ -834,7 +832,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.flush(new Engine.Flush()); engine.flush(new Engine.Flush());
index = new Engine.Index(null, newUid("1"), doc).version(1l); index = new Engine.Index(null, newUid("1"), doc, 1l, VersionType.INTERNAL, PRIMARY, 0);
try { try {
engine.index(index); engine.index(index);
fail(); fail();
@ -843,7 +841,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
} }
// future versions should not work as well // future versions should not work as well
index = new Engine.Index(null, newUid("1"), doc).version(3l); index = new Engine.Index(null, newUid("1"), doc, 3l, VersionType.INTERNAL, PRIMARY, 0);
try { try {
engine.index(index); engine.index(index);
fail(); fail();
@ -855,17 +853,17 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@Test @Test
public void testExternalVersioningIndexConflictWithFlush() { public void testExternalVersioningIndexConflictWithFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(12); Engine.Index index = new Engine.Index(null, newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(12l)); assertThat(index.version(), equalTo(12l));
index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(14); index = new Engine.Index(null, newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0);
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(14l)); assertThat(index.version(), equalTo(14l));
engine.flush(new Engine.Flush()); engine.flush(new Engine.Flush());
index = new Engine.Index(null, newUid("1"), doc).versionType(VersionType.EXTERNAL).version(13); index = new Engine.Index(null, newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0);
try { try {
engine.index(index); engine.index(index);
fail(); fail();
@ -885,7 +883,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.index(index); engine.index(index);
assertThat(index.version(), equalTo(2l)); assertThat(index.version(), equalTo(2l));
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1")).version(1l); Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1l, VersionType.INTERNAL, PRIMARY, 0, false);
try { try {
engine.delete(delete); engine.delete(delete);
fail(); fail();
@ -894,7 +892,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
} }
// future versions should not work as well // future versions should not work as well
delete = new Engine.Delete("test", "1", newUid("1")).version(3l); delete = new Engine.Delete("test", "1", newUid("1"), 3l, VersionType.INTERNAL, PRIMARY, 0, false);
try { try {
engine.delete(delete); engine.delete(delete);
fail(); fail();
@ -903,12 +901,12 @@ public class InternalEngineTests extends ElasticsearchTestCase {
} }
// now actually delete // now actually delete
delete = new Engine.Delete("test", "1", newUid("1")).version(2l); delete = new Engine.Delete("test", "1", newUid("1"), 2l, VersionType.INTERNAL, PRIMARY, 0, false);
engine.delete(delete); engine.delete(delete);
assertThat(delete.version(), equalTo(3l)); assertThat(delete.version(), equalTo(3l));
// now check if we can index to a delete doc with version // now check if we can index to a delete doc with version
index = new Engine.Index(null, newUid("1"), doc).version(2l); index = new Engine.Index(null, newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0);
try { try {
engine.index(index); engine.index(index);
fail(); fail();
@ -917,7 +915,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
} }
// we shouldn't be able to create as well // we shouldn't be able to create as well
Engine.Create create = new Engine.Create(null, newUid("1"), doc).version(2l); Engine.Create create = new Engine.Create(null, newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0);
try { try {
engine.create(create); engine.create(create);
} catch (VersionConflictEngineException e) { } catch (VersionConflictEngineException e) {
@ -938,7 +936,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.flush(new Engine.Flush()); engine.flush(new Engine.Flush());
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1")).version(1l); Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1l, VersionType.INTERNAL, PRIMARY, 0, false);
try { try {
engine.delete(delete); engine.delete(delete);
fail(); fail();
@ -947,7 +945,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
} }
// future versions should not work as well // future versions should not work as well
delete = new Engine.Delete("test", "1", newUid("1")).version(3l); delete = new Engine.Delete("test", "1", newUid("1"), 3l, VersionType.INTERNAL, PRIMARY, 0, false);
try { try {
engine.delete(delete); engine.delete(delete);
fail(); fail();
@ -958,14 +956,14 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.flush(new Engine.Flush()); engine.flush(new Engine.Flush());
// now actually delete // now actually delete
delete = new Engine.Delete("test", "1", newUid("1")).version(2l); delete = new Engine.Delete("test", "1", newUid("1"), 2l, VersionType.INTERNAL, PRIMARY, 0, false);
engine.delete(delete); engine.delete(delete);
assertThat(delete.version(), equalTo(3l)); assertThat(delete.version(), equalTo(3l));
engine.flush(new Engine.Flush()); engine.flush(new Engine.Flush());
// now check if we can index to a delete doc with version // now check if we can index to a delete doc with version
index = new Engine.Index(null, newUid("1"), doc).version(2l); index = new Engine.Index(null, newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0);
try { try {
engine.index(index); engine.index(index);
fail(); fail();
@ -974,7 +972,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
} }
// we shouldn't be able to create as well // we shouldn't be able to create as well
Engine.Create create = new Engine.Create(null, newUid("1"), doc).version(2l); Engine.Create create = new Engine.Create(null, newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0);
try { try {
engine.create(create); engine.create(create);
} catch (VersionConflictEngineException e) { } catch (VersionConflictEngineException e) {
@ -985,11 +983,11 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@Test @Test
public void testVersioningCreateExistsException() { public void testVersioningCreateExistsException() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(null, newUid("1"), doc); Engine.Create create = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
engine.create(create); engine.create(create);
assertThat(create.version(), equalTo(1l)); assertThat(create.version(), equalTo(1l));
create = new Engine.Create(null, newUid("1"), doc); create = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
try { try {
engine.create(create); engine.create(create);
fail(); fail();
@ -1001,13 +999,13 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@Test @Test
public void testVersioningCreateExistsExceptionWithFlush() { public void testVersioningCreateExistsExceptionWithFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Create create = new Engine.Create(null, newUid("1"), doc); Engine.Create create = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
engine.create(create); engine.create(create);
assertThat(create.version(), equalTo(1l)); assertThat(create.version(), equalTo(1l));
engine.flush(new Engine.Flush()); engine.flush(new Engine.Flush());
create = new Engine.Create(null, newUid("1"), doc); create = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
try { try {
engine.create(create); engine.create(create);
fail(); fail();
@ -1028,13 +1026,12 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertThat(index.version(), equalTo(2l)); assertThat(index.version(), equalTo(2l));
// apply the second index to the replica, should work fine // apply the second index to the replica, should work fine
index = new Engine.Index(null, newUid("1"), doc).version(index.version()) index = new Engine.Index(null, newUid("1"), doc, index.version(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.index(index); replicaEngine.index(index);
assertThat(index.version(), equalTo(2l)); assertThat(index.version(), equalTo(2l));
// now, the old one should not work // now, the old one should not work
index = new Engine.Index(null, newUid("1"), doc).version(1l).versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); index = new Engine.Index(null, newUid("1"), doc, 1l, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
try { try {
replicaEngine.index(index); replicaEngine.index(index);
fail(); fail();
@ -1044,8 +1041,8 @@ public class InternalEngineTests extends ElasticsearchTestCase {
// second version on replica should fail as well // second version on replica should fail as well
try { try {
index = new Engine.Index(null, newUid("1"), doc).version(2l) index = new Engine.Index(null, newUid("1"), doc, 2l
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
replicaEngine.index(index); replicaEngine.index(index);
assertThat(index.version(), equalTo(2l)); assertThat(index.version(), equalTo(2l));
} catch (VersionConflictEngineException e) { } catch (VersionConflictEngineException e) {
@ -1061,8 +1058,8 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertThat(index.version(), equalTo(1l)); assertThat(index.version(), equalTo(1l));
// apply the first index to the replica, should work fine // apply the first index to the replica, should work fine
index = new Engine.Index(null, newUid("1"), doc).version(1l) index = new Engine.Index(null, newUid("1"), doc, 1l
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
replicaEngine.index(index); replicaEngine.index(index);
assertThat(index.version(), equalTo(1l)); assertThat(index.version(), equalTo(1l));
@ -1077,15 +1074,15 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertThat(delete.version(), equalTo(3l)); assertThat(delete.version(), equalTo(3l));
// apply the delete on the replica (skipping the second index) // apply the delete on the replica (skipping the second index)
delete = new Engine.Delete("test", "1", newUid("1")).version(3l) delete = new Engine.Delete("test", "1", newUid("1"), 3l
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, false);
replicaEngine.delete(delete); replicaEngine.delete(delete);
assertThat(delete.version(), equalTo(3l)); assertThat(delete.version(), equalTo(3l));
// second time delete with same version should fail // second time delete with same version should fail
try { try {
delete = new Engine.Delete("test", "1", newUid("1")).version(3l) delete = new Engine.Delete("test", "1", newUid("1"), 3l
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA); , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, false);
replicaEngine.delete(delete); replicaEngine.delete(delete);
fail("excepted VersionConflictEngineException to be thrown"); fail("excepted VersionConflictEngineException to be thrown");
} catch (VersionConflictEngineException e) { } catch (VersionConflictEngineException e) {
@ -1094,8 +1091,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
// now do the second index on the replica, it should fail // now do the second index on the replica, it should fail
try { try {
index = new Engine.Index(null, newUid("1"), doc).version(2l) index = new Engine.Index(null, newUid("1"), doc, 2l, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.index(index); replicaEngine.index(index);
fail("excepted VersionConflictEngineException to be thrown"); fail("excepted VersionConflictEngineException to be thrown");
} catch (VersionConflictEngineException e) { } catch (VersionConflictEngineException e) {