Added created flag to index related request classes.

The flag is set to true when a document is new, false when replacing an existing object.

Other minor changes:
Fixed an issue with dynamic gc deletes settings update
Added an assertThrows to ElasticsearchAssertion

Closes #3084 , Closes #3154
This commit is contained in:
Boaz Leskes 2013-05-24 21:33:18 +02:00
parent a2de34eead
commit aa851225e5
18 changed files with 423 additions and 187 deletions

View File

@ -232,7 +232,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
BytesReference indexSourceAsBytes = indexRequest.source(); BytesReference indexSourceAsBytes = indexRequest.source();
// add the response // add the response
IndexResponse indexResponse = result.response(); IndexResponse indexResponse = result.response();
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion()); UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(),
indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
updateResponse.setMatches(indexResponse.getMatches()); updateResponse.setMatches(indexResponse.getMatches());
if (updateRequest.fields() != null && updateRequest.fields().length > 0) { if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
@ -258,7 +259,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
case DELETE: case DELETE:
DeleteResponse response = updateResult.writeResult.response(); DeleteResponse response = updateResult.writeResult.response();
DeleteRequest deleteRequest = updateResult.request(); DeleteRequest deleteRequest = updateResult.request();
updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null)); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
responses[i] = new BulkItemResponse(item.id(), "update", updateResponse); responses[i] = new BulkItemResponse(item.id(), "update", updateResponse);
// Replace the update request to the translated delete request to execute on the replica. // Replace the update request to the translated delete request to execute on the replica.
@ -376,17 +377,20 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
long version; long version;
boolean created;
Engine.IndexingOperation op; Engine.IndexingOperation op;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) { if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
indexShard.index(index); indexShard.index(index);
version = index.version(); version = index.version();
op = index; op = index;
created = index.created();
} else { } else {
Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
indexShard.create(create); indexShard.create(create);
version = create.version(); version = create.version();
op = create; op = create;
created = true;
} }
long preVersion = indexRequest.version(); long preVersion = indexRequest.version();
// update the version on request so it will happen on the replicas // update the version on request so it will happen on the replicas
@ -403,7 +407,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
op = null; op = null;
} }
IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version); IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version, created);
return new WriteResult(indexResponse, preVersion, mappingsToUpdate, op); return new WriteResult(indexResponse, preVersion, mappingsToUpdate, op);
} }

View File

@ -38,6 +38,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
@ -110,12 +111,12 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
@Override @Override
public void onResponse(IndexDeleteResponse indexDeleteResponse) { public void onResponse(IndexDeleteResponse indexDeleteResponse) {
// go over the response, see if we have found one, and the version if found // go over the response, see if we have found one, and the version if found
long version = 0; long version = Versions.MATCH_ANY;
boolean found = false; boolean found = false;
for (ShardDeleteResponse deleteResponse : indexDeleteResponse.getResponses()) { for (ShardDeleteResponse deleteResponse : indexDeleteResponse.getResponses()) {
if (!deleteResponse.isNotFound()) { if (!deleteResponse.isNotFound()) {
found = true;
version = deleteResponse.getVersion(); version = deleteResponse.getVersion();
found = true;
break; break;
} }
} }

View File

@ -40,17 +40,19 @@ public class IndexResponse extends ActionResponse {
private String id; private String id;
private String type; private String type;
private long version; private long version;
private boolean created;
private List<String> matches; private List<String> matches;
public IndexResponse() { public IndexResponse() {
} }
public IndexResponse(String index, String type, String id, long version) { public IndexResponse(String index, String type, String id, long version, boolean created) {
this.index = index; this.index = index;
this.id = id; this.id = id;
this.type = type; this.type = type;
this.version = version; this.version = version;
this.created = created;
} }
/** /**
@ -75,12 +77,19 @@ public class IndexResponse extends ActionResponse {
} }
/** /**
* Returns the version of the doc indexed. * Returns the current version of the doc indexed.
*/ */
public long getVersion() { public long getVersion() {
return this.version; return this.version;
} }
/**
* Returns true if the document was created, false if updated.
*/
public boolean isCreated() {
return this.created;
}
/** /**
* Returns the percolate queries matches. <tt>null</tt> if no percolation was requested. * Returns the percolate queries matches. <tt>null</tt> if no percolation was requested.
*/ */
@ -102,6 +111,7 @@ public class IndexResponse extends ActionResponse {
id = in.readString(); id = in.readString();
type = in.readString(); type = in.readString();
version = in.readLong(); version = in.readLong();
created = in.readBoolean();
if (in.readBoolean()) { if (in.readBoolean()) {
int size = in.readVInt(); int size = in.readVInt();
if (size == 0) { if (size == 0) {
@ -132,6 +142,7 @@ public class IndexResponse extends ActionResponse {
out.writeString(id); out.writeString(id);
out.writeString(type); out.writeString(type);
out.writeLong(version); out.writeLong(version);
out.writeBoolean(created);
if (matches == null) { if (matches == null) {
out.writeBoolean(false); out.writeBoolean(false);
} else { } else {

View File

@ -198,6 +198,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()) SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version; long version;
boolean created;
Engine.IndexingOperation op; Engine.IndexingOperation op;
if (request.opType() == IndexRequest.OpType.INDEX) { if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse) Engine.Index index = indexShard.prepareIndex(sourceToParse)
@ -207,6 +208,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
indexShard.index(index); indexShard.index(index);
version = index.version(); version = index.version();
op = index; op = index;
created = index.created();
} else { } else {
Engine.Create create = indexShard.prepareCreate(sourceToParse) Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(request.version()) .version(request.version())
@ -215,6 +217,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
indexShard.create(create); indexShard.create(create);
version = create.version(); version = create.version();
op = create; op = create;
created = true;
} }
if (request.refresh()) { if (request.refresh()) {
try { try {
@ -229,7 +232,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
// update the version on the request, so it will be used for the replicas // update the version on the request, so it will be used for the replicas
request.version(version); request.version(version);
IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version); IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version, created);
return new PrimaryResponse<IndexResponse, IndexRequest>(shardRequest.request, response, op); return new PrimaryResponse<IndexResponse, IndexRequest>(shardRequest.request, response, op);
} }

View File

@ -193,7 +193,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() { indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@Override @Override
public void onResponse(IndexResponse response) { public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(),
response.getVersion(), response.isCreated());
update.setMatches(response.getMatches()); update.setMatches(response.getMatches());
if (request.fields() != null && request.fields().length > 0) { if (request.fields() != null && request.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true); Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true);
@ -229,7 +230,8 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() { indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override @Override
public void onResponse(IndexResponse response) { public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(),
response.getVersion(), response.isCreated());
update.setMatches(response.getMatches()); update.setMatches(response.getMatches());
update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
listener.onResponse(update); listener.onResponse(update);
@ -258,7 +260,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() { deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override @Override
public void onResponse(DeleteResponse response) { public void onResponse(DeleteResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
listener.onResponse(update); listener.onResponse(update);
} }

View File

@ -149,6 +149,7 @@ public class UpdateHelper extends AbstractComponent {
} }
// TODO: external version type, does it make sense here? does not seem like it... // TODO: external version type, does it make sense here? does not seem like it...
// TODO: because we use getResult.getVersion we loose the doc.version. The question is where is the right place?
if (operation == null || "index".equals(operation)) { if (operation == null || "index".equals(operation)) {
final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(updatedSourceAsMap, updateSourceContentType) .source(updatedSourceAsMap, updateSourceContentType)
@ -164,12 +165,12 @@ public class UpdateHelper extends AbstractComponent {
deleteRequest.operationThreaded(false); deleteRequest.operationThreaded(false);
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType); return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) { } else if ("none".equals(operation)) {
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion()); UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
update.setGetResult(extractGetResult(request, getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, null)); update.setGetResult(extractGetResult(request, getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, null));
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType); return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
} else { } else {
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script); logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script);
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion()); UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType); return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
} }
} }

View File

@ -37,6 +37,7 @@ public class UpdateResponse extends ActionResponse {
private String id; private String id;
private String type; private String type;
private long version; private long version;
private boolean created;
private List<String> matches; private List<String> matches;
private GetResult getResult; private GetResult getResult;
@ -44,11 +45,12 @@ public class UpdateResponse extends ActionResponse {
} }
public UpdateResponse(String index, String type, String id, long version) { public UpdateResponse(String index, String type, String id, long version, boolean created) {
this.index = index; this.index = index;
this.id = id; this.id = id;
this.type = type; this.type = type;
this.version = version; this.version = version;
this.created = created;
} }
/** /**
@ -73,7 +75,7 @@ public class UpdateResponse extends ActionResponse {
} }
/** /**
* Returns the version of the doc indexed. * Returns the current version of the doc indexed.
*/ */
public long getVersion() { public long getVersion() {
return this.version; return this.version;
@ -94,6 +96,14 @@ public class UpdateResponse extends ActionResponse {
return this.getResult; return this.getResult;
} }
/**
* Returns true if document was created due to an UPSERT operation
*/
public boolean isCreated() {
return this.created;
}
/** /**
* Internal. * Internal.
*/ */
@ -108,6 +118,7 @@ public class UpdateResponse extends ActionResponse {
id = in.readString(); id = in.readString();
type = in.readString(); type = in.readString();
version = in.readLong(); version = in.readLong();
created = in.readBoolean();
if (in.readBoolean()) { if (in.readBoolean()) {
int size = in.readVInt(); int size = in.readVInt();
if (size == 0) { if (size == 0) {
@ -141,6 +152,7 @@ public class UpdateResponse extends ActionResponse {
out.writeString(id); out.writeString(id);
out.writeString(type); out.writeString(type);
out.writeLong(version); out.writeLong(version);
out.writeBoolean(created);
if (matches == null) { if (matches == null) {
out.writeBoolean(false); out.writeBoolean(false);
} else { } else {

View File

@ -32,6 +32,7 @@ import java.util.List;
/** Utility class to resolve the Lucene doc ID and version for a given uid. */ /** Utility class to resolve the Lucene doc ID and version for a given uid. */
public class Versions { public class Versions {
public static final long MATCH_ANY = 0L; // Version was not specified by the user
public static final long NOT_FOUND = -1L; public static final long NOT_FOUND = -1L;
public static final long NOT_SET = -2L; public static final long NOT_SET = -2L;

View File

@ -379,7 +379,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final DocumentMapper docMapper; private final DocumentMapper docMapper;
private final Term uid; private final Term uid;
private final ParsedDocument doc; private final ParsedDocument doc;
private long version; private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL; private VersionType versionType = VersionType.INTERNAL;
private Origin origin = Origin.PRIMARY; private Origin origin = Origin.PRIMARY;
@ -506,9 +506,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final DocumentMapper docMapper; private final DocumentMapper docMapper;
private final Term uid; private final Term uid;
private final ParsedDocument doc; private final ParsedDocument doc;
private long version; private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL; private VersionType versionType = VersionType.INTERNAL;
private Origin origin = Origin.PRIMARY; private Origin origin = Origin.PRIMARY;
private boolean created;
private long startTime; private long startTime;
private long endTime; private long endTime;
@ -554,6 +555,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this; return this;
} }
/**
* before indexing holds the version requested, after indexing holds the new version of the document.
*/
public long version() { public long version() {
return this.version; return this.version;
} }
@ -627,13 +631,24 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
public long endTime() { public long endTime() {
return this.endTime; return this.endTime;
} }
/**
* @return true if object was created
*/
public boolean created() {
return created;
}
public void created(boolean created) {
this.created = created;
}
} }
static class Delete implements Operation { static class Delete implements Operation {
private final String type; private final String type;
private final String id; private final String id;
private final Term uid; private final Term uid;
private long version; private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL; private VersionType versionType = VersionType.INTERNAL;
private Origin origin = Origin.PRIMARY; private Origin origin = Origin.PRIMARY;
private boolean notFound; private boolean notFound;
@ -679,6 +694,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this; return this;
} }
/**
* before delete execution this is the version to be deleted. After this is the version of the "delete" transaction record.
*/
public long version() { public long version() {
return this.version; return this.version;
} }
@ -701,7 +719,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this; return this;
} }
public Delete startTime(long startTime) { public Delete startTime(long startTime) {
this.startTime = startTime; this.startTime = startTime;
return this; return this;
@ -836,7 +853,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final Versions.DocIdAndVersion docIdAndVersion; private final Versions.DocIdAndVersion docIdAndVersion;
private final Searcher searcher; private final Searcher searcher;
public static final GetResult NOT_EXISTS = new GetResult(false, -1, null); public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null);
public GetResult(boolean exists, long version, @Nullable Translog.Source source) { public GetResult(boolean exists, long version, @Nullable Translog.Source source) {
this.source = source; this.source = source;

View File

@ -386,7 +386,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
currentVersion = loadCurrentVersionFromIndex(create.uid()); currentVersion = loadCurrentVersionFromIndex(create.uid());
} else { } else {
if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
currentVersion = -1; // deleted, and GC currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else { } else {
currentVersion = versionValue.version(); currentVersion = versionValue.version();
} }
@ -397,18 +397,18 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (create.origin() == Operation.Origin.PRIMARY) { if (create.origin() == Operation.Origin.PRIMARY) {
if (create.versionType() == VersionType.INTERNAL) { // internal version type if (create.versionType() == VersionType.INTERNAL) { // internal version type
long expectedVersion = create.version(); long expectedVersion = create.version();
if (expectedVersion != 0 && currentVersion != -2) { // -2 means we don't have a version, so ignore... if (expectedVersion != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) {
// an explicit version is provided, see if there is a conflict // an explicit version is provided, see if there is a conflict
// if the current version is -1, means we did not find anything, and // if we did not find anything, and a version is provided, so we do expect to find a doc under that version
// a version is provided, so we do expect to find a doc under that version
// this is important, since we don't allow to preset a version in order to handle deletes // this is important, since we don't allow to preset a version in order to handle deletes
if (currentVersion == -1) { if (currentVersion == Versions.NOT_FOUND) {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), -1, expectedVersion); throw new VersionConflictEngineException(shardId, create.type(), create.id(), Versions.NOT_FOUND, expectedVersion);
} else if (expectedVersion != currentVersion) { } else if (expectedVersion != currentVersion) {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion); throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
} }
} }
updatedVersion = currentVersion < 0 ? 1 : currentVersion + 1; updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
} else { // external version type } else { // external version type
// an external version is provided, just check, if a local version exists, that its higher than it // an external version is provided, just check, if a local version exists, that its higher than it
// the actual version checking is one in an external system, and we just want to not index older versions // the actual version checking is one in an external system, and we just want to not index older versions
@ -421,10 +421,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
} else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
long expectedVersion = create.version(); long expectedVersion = create.version();
if (currentVersion != -2) { // -2 means we don't have a version, so ignore... if (currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore...
// if it does not exists, and its considered the first index operation (replicas/recovery are 1 of) // if it does not exists, and its considered the first index operation (replicas/recovery are 1 of)
// then nothing to check // then nothing to check
if (!(currentVersion == -1 && create.version() == 1)) { if (!(currentVersion == Versions.NOT_FOUND && create.version() == 1)) {
// with replicas/recovery, we only check for previous version, we allow to set a future version // with replicas/recovery, we only check for previous version, we allow to set a future version
if (expectedVersion <= currentVersion) { if (expectedVersion <= currentVersion) {
if (create.origin() == Operation.Origin.RECOVERY) { if (create.origin() == Operation.Origin.RECOVERY) {
@ -448,7 +448,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
throw new DocumentAlreadyExistsException(shardId, create.type(), create.id()); throw new DocumentAlreadyExistsException(shardId, create.type(), create.id());
} }
} }
} else if (currentVersion != -1) { } else if (currentVersion != Versions.NOT_FOUND) {
// its not deleted, its already there // its not deleted, its already there
if (create.origin() == Operation.Origin.RECOVERY) { if (create.origin() == Operation.Origin.RECOVERY) {
return; return;
@ -509,7 +509,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
currentVersion = loadCurrentVersionFromIndex(index.uid()); currentVersion = loadCurrentVersionFromIndex(index.uid());
} else { } else {
if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
currentVersion = -1; // deleted, and GC currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else { } else {
currentVersion = versionValue.version(); currentVersion = versionValue.version();
} }
@ -519,18 +519,18 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (index.origin() == Operation.Origin.PRIMARY) { if (index.origin() == Operation.Origin.PRIMARY) {
if (index.versionType() == VersionType.INTERNAL) { // internal version type if (index.versionType() == VersionType.INTERNAL) { // internal version type
long expectedVersion = index.version(); long expectedVersion = index.version();
if (expectedVersion != 0 && currentVersion != -2) { // -2 means we don't have a version, so ignore... if (expectedVersion != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) {
// an explicit version is provided, see if there is a conflict // an explicit version is provided, see if there is a conflict
// if the current version is -1, means we did not find anything, and // if we did not find anything, and a version is provided, so we do expect to find a doc under that version
// a version is provided, so we do expect to find a doc under that version
// this is important, since we don't allow to preset a version in order to handle deletes // this is important, since we don't allow to preset a version in order to handle deletes
if (currentVersion == -1) { if (currentVersion == Versions.NOT_FOUND) {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), -1, expectedVersion); throw new VersionConflictEngineException(shardId, index.type(), index.id(), Versions.NOT_FOUND, expectedVersion);
} else if (expectedVersion != currentVersion) { } else if (expectedVersion != currentVersion) {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion); throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
} }
} }
updatedVersion = currentVersion < 0 ? 1 : currentVersion + 1; updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
} else { // external version type } else { // external version type
// an external version is provided, just check, if a local version exists, that its higher than it // an external version is provided, just check, if a local version exists, that its higher than it
// the actual version checking is one in an external system, and we just want to not index older versions // the actual version checking is one in an external system, and we just want to not index older versions
@ -543,10 +543,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
} else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
long expectedVersion = index.version(); long expectedVersion = index.version();
if (currentVersion != -2) { // -2 means we don't have a version, so ignore... if (currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore...
// if it does not exists, and its considered the first index operation (replicas/recovery are 1 of) // if it does not exists, and its considered the first index operation (replicas/recovery are 1 of)
// then nothing to check // then nothing to check
if (!(currentVersion == -1 && index.version() == 1)) { if (!(currentVersion == Versions.NOT_FOUND && index.version() == 1)) {
// with replicas/recovery, we only check for previous version, we allow to set a future version // with replicas/recovery, we only check for previous version, we allow to set a future version
if (expectedVersion <= currentVersion) { if (expectedVersion <= currentVersion) {
if (index.origin() == Operation.Origin.RECOVERY) { if (index.origin() == Operation.Origin.RECOVERY) {
@ -562,15 +562,16 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
index.version(updatedVersion); index.version(updatedVersion);
if (currentVersion == Versions.NOT_FOUND) {
if (currentVersion == -1) {
// document does not exists, we can optimize for create // document does not exists, we can optimize for create
index.created(true);
if (index.docs().size() > 1) { if (index.docs().size() > 1) {
writer.addDocuments(index.docs(), index.analyzer()); writer.addDocuments(index.docs(), index.analyzer());
} else { } else {
writer.addDocument(index.docs().get(0), index.analyzer()); writer.addDocument(index.docs().get(0), index.analyzer());
} }
} else { } else {
if (versionValue != null) index.created(versionValue.delete()); // we have a delete which is not GC'ed...
if (index.docs().size() > 1) { if (index.docs().size() > 1) {
writer.updateDocuments(index.uid(), index.docs(), index.analyzer()); writer.updateDocuments(index.uid(), index.docs(), index.analyzer());
} else { } else {
@ -621,7 +622,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
currentVersion = loadCurrentVersionFromIndex(delete.uid()); currentVersion = loadCurrentVersionFromIndex(delete.uid());
} else { } else {
if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
currentVersion = -1; // deleted, and GC currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else { } else {
currentVersion = versionValue.version(); currentVersion = versionValue.version();
} }
@ -630,21 +631,21 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
long updatedVersion; long updatedVersion;
if (delete.origin() == Operation.Origin.PRIMARY) { if (delete.origin() == Operation.Origin.PRIMARY) {
if (delete.versionType() == VersionType.INTERNAL) { // internal version type if (delete.versionType() == VersionType.INTERNAL) { // internal version type
if (delete.version() != 0 && currentVersion != -2) { // -2 means we don't have a version, so ignore... if (delete.version() != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore...
// an explicit version is provided, see if there is a conflict // an explicit version is provided, see if there is a conflict
// if the current version is -1, means we did not find anything, and // if we did not find anything and a version is provided, so we do expect to find a doc under that version
// a version is provided, so we do expect to find a doc under that version if (currentVersion == Versions.NOT_FOUND) {
if (currentVersion == -1) { throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), Versions.NOT_FOUND, delete.version());
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), -1, delete.version());
} else if (delete.version() != currentVersion) { } else if (delete.version() != currentVersion) {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, delete.version()); throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, delete.version());
} }
} }
updatedVersion = currentVersion < 0 ? 1 : currentVersion + 1; updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
} else { // External } else { // External
if (currentVersion == -1) { if (currentVersion == Versions.NOT_FOUND) {
// its an external version, that's fine, we allow it to be set // its an external version, that's fine, we allow it to be set
//throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), -1, delete.version()); //throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), UidField.DocIdAndVersion.Versions.NOT_FOUND, delete.version());
} else if (currentVersion >= delete.version()) { } else if (currentVersion >= delete.version()) {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, delete.version()); throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, delete.version());
} }
@ -652,9 +653,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} }
} else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
// on replica, the version is the future value expected (returned from the operation on the primary) // on replica, the version is the future value expected (returned from the operation on the primary)
if (currentVersion != -2) { // -2 means we don't have a version in the index, ignore if (currentVersion != Versions.NOT_SET) { // we don't have a version in the index, ignore
// only check if we have a version for it, otherwise, ignore (see later) // only check if we have a version for it, otherwise, ignore (see later)
if (currentVersion != -1) { if (currentVersion != Versions.NOT_FOUND) {
// with replicas, we only check for previous version, we allow to set a future version // with replicas, we only check for previous version, we allow to set a future version
if (delete.version() <= currentVersion) { if (delete.version() <= currentVersion) {
if (delete.origin() == Operation.Origin.RECOVERY) { if (delete.origin() == Operation.Origin.RECOVERY) {
@ -669,7 +670,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
updatedVersion = delete.version(); updatedVersion = delete.version();
} }
if (currentVersion == -1) { if (currentVersion == Versions.NOT_FOUND) {
// doc does not exists and no prior deletes // doc does not exists and no prior deletes
delete.version(updatedVersion).notFound(true); delete.version(updatedVersion).notFound(true);
Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
@ -1357,7 +1358,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
class ApplySettings implements IndexSettingsService.Listener { class ApplySettings implements IndexSettingsService.Listener {
@Override @Override
public void onRefreshSettings(Settings settings) { public void onRefreshSettings(Settings settings) {
long gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(RobinEngine.this.gcDeletesInMillis)).millis(); long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(RobinEngine.this.gcDeletesInMillis)).millis();
if (gcDeletesInMillis != RobinEngine.this.gcDeletesInMillis) { if (gcDeletesInMillis != RobinEngine.this.gcDeletesInMillis) {
logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(RobinEngine.this.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis)); logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(RobinEngine.this.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis));
RobinEngine.this.gcDeletesInMillis = gcDeletesInMillis; RobinEngine.this.gcDeletesInMillis = gcDeletesInMillis;

View File

@ -124,7 +124,7 @@ public class RestIndexAction extends BaseRestHandler {
} }
builder.endObject(); builder.endObject();
RestStatus status = OK; RestStatus status = OK;
if (response.getVersion() == 1) { if (response.isCreated()) {
status = CREATED; status = CREATED;
} }
channel.sendResponse(new XContentRestResponse(request, status, builder)); channel.sendResponse(new XContentRestResponse(request, status, builder));

View File

@ -149,7 +149,7 @@ public class RestUpdateAction extends BaseRestHandler {
} }
builder.endObject(); builder.endObject();
RestStatus status = OK; RestStatus status = OK;
if (response.getVersion() == 1) { if (response.isCreated()) {
status = CREATED; status = CREATED;
} }
channel.sendResponse(new XContentRestResponse(request, status, builder)); channel.sendResponse(new XContentRestResponse(request, status, builder));

View File

@ -18,20 +18,22 @@
*/ */
package org.elasticsearch.test.hamcrest; package org.elasticsearch.test.hamcrest;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import java.util.Arrays;
import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.Suggest;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import java.util.Arrays;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
/** /**
* *
*/ */
@ -128,4 +130,22 @@ public class ElasticsearchAssertions {
return (T)q.getClauses()[i].getQuery(); return (T)q.getClauses()[i].getQuery();
} }
public static <E extends Throwable> void assertThrows(ActionFuture future, Class<E> exceptionClass) {
boolean fail=false;
try {
future.actionGet();
fail=true;
}
catch (ElasticSearchException esException) {
assertThat(esException.unwrapCause(), instanceOf(exceptionClass));
}
catch (Throwable e) {
assertThat(e, instanceOf(exceptionClass));
}
// has to be outside catch clause to get a proper message
if (fail)
throw new AssertionError("Expected a " + exceptionClass + " exception to be thrown");
}
} }

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRespon
import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse; import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.AdminClient;
@ -314,8 +315,12 @@ public abstract class AbstractSharedClusterTest extends ElasticsearchTestCase {
} }
// utils // utils
protected void index(String index, String type, XContentBuilder source) { protected IndexResponse index(String index, String type, XContentBuilder source) {
client().prepareIndex(index, type).setSource(source).execute().actionGet(); return client().prepareIndex(index, type).setSource(source).execute().actionGet();
}
protected IndexResponse index(String index, String type, String id, String field, Object value) {
return client().prepareIndex(index, type, id).setSource(field, value).execute().actionGet();
} }
protected RefreshResponse refresh() { protected RefreshResponse refresh() {

View File

@ -0,0 +1,129 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.indexing;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicIntegerArray;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
/**
*
*/
public class IndexActionTests extends AbstractSharedClusterTest {
@Test
public void testCreatedFlag() throws Exception {
createIndex("test"); ensureGreen();
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet();
assertTrue(indexResponse.isCreated());
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").execute().actionGet();
assertFalse(indexResponse.isCreated());
client().prepareDelete("test", "type", "1").execute().actionGet();
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").execute().actionGet();
assertTrue(indexResponse.isCreated());
}
@Test
public void testCreatedFlagWithFlush() throws Exception {
createIndex("test"); ensureGreen();
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet();
assertTrue(indexResponse.isCreated());
client().prepareDelete("test", "type", "1").execute().actionGet();
flush();
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").execute().actionGet();
assertTrue(indexResponse.isCreated());
}
@Test
public void testCreatedFlagParallelExecution() throws Exception {
createIndex("test"); ensureGreen();
int threadCount = 20;
final int docCount = 300;
int taskCount = docCount * threadCount;
final AtomicIntegerArray createdCounts = new AtomicIntegerArray(docCount);
ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(taskCount);
final Random random = new Random();
for (int i=0;i< taskCount; i++ ) {
tasks.add(new Callable<Void>() {
@Override
public Void call() throws Exception {
int docId = random.nextInt(docCount);
IndexResponse indexResponse = index("test", "type", Integer.toString(docId), "field1", "value");
if (indexResponse.isCreated()) createdCounts.incrementAndGet(docId);
return null;
}
});
}
threadPool.invokeAll(tasks);
for (int i=0;i<docCount;i++) {
assertThat(createdCounts.get(i), lessThanOrEqualTo(1));
}
}
@Test
public void testCreatedFlagWithExternalVersioning() throws Exception {
createIndex("test"); ensureGreen();
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(123)
.setVersionType(VersionType.EXTERNAL).execute().actionGet();
assertTrue(indexResponse.isCreated());
}
@Test
public void testCreateFlagWithBulk() {
createIndex("test"); ensureGreen();
BulkResponse bulkResponse = client().prepareBulk().add(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1")).execute().actionGet();
assertThat(bulkResponse.hasFailures(), equalTo(false));
assertThat(bulkResponse.getItems().length, equalTo(1));
IndexResponse indexResponse = bulkResponse.getItems()[0].getResponse();
assertTrue(indexResponse.isCreated());
}
}

View File

@ -28,6 +28,8 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail; import static org.testng.AssertJUnit.fail;
import java.util.HashMap; import java.util.HashMap;
@ -46,7 +48,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test; import org.testng.annotations.Test;
public class UpdateTests extends AbstractSharedClusterTest { public class UpdateTests extends AbstractSharedClusterTest {
@ -54,6 +55,7 @@ public class UpdateTests extends AbstractSharedClusterTest {
protected void createIndex() throws Exception { protected void createIndex() throws Exception {
logger.info("--> creating index test"); logger.info("--> creating index test");
client().admin().indices().prepareCreate("test") client().admin().indices().prepareCreate("test")
.addMapping("type1", XContentFactory.jsonBuilder() .addMapping("type1", XContentFactory.jsonBuilder()
.startObject() .startObject()
@ -145,20 +147,23 @@ public class UpdateTests extends AbstractSharedClusterTest {
assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
client().prepareUpdate("test", "type1", "1") UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1")
.setUpsertRequest(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject()) .setUpsertRequest(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject())
.setScript("ctx._source.field += 1") .setScript("ctx._source.field += 1")
.execute().actionGet(); .execute().actionGet();
assertTrue(updateResponse.isCreated());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("1")); assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("1"));
} }
client().prepareUpdate("test", "type1", "1") updateResponse = client().prepareUpdate("test", "type1", "1")
.setUpsertRequest(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject()) .setUpsertRequest(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject())
.setScript("ctx._source.field += 1") .setScript("ctx._source.field += 1")
.execute().actionGet(); .execute().actionGet();
assertFalse(updateResponse.isCreated());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
@ -231,6 +236,7 @@ public class UpdateTests extends AbstractSharedClusterTest {
UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx._source.field += 1").execute().actionGet(); UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx._source.field += 1").execute().actionGet();
assertThat(updateResponse.getVersion(), equalTo(2L)); assertThat(updateResponse.getVersion(), equalTo(2L));
assertFalse(updateResponse.isCreated());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
@ -239,6 +245,7 @@ public class UpdateTests extends AbstractSharedClusterTest {
updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx._source.field += count").addScriptParam("count", 3).execute().actionGet(); updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx._source.field += count").addScriptParam("count", 3).execute().actionGet();
assertThat(updateResponse.getVersion(), equalTo(3L)); assertThat(updateResponse.getVersion(), equalTo(3L));
assertFalse(updateResponse.isCreated());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
@ -248,6 +255,7 @@ public class UpdateTests extends AbstractSharedClusterTest {
// check noop // check noop
updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx.op = 'none'").execute().actionGet(); updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx.op = 'none'").execute().actionGet();
assertThat(updateResponse.getVersion(), equalTo(3L)); assertThat(updateResponse.getVersion(), equalTo(3L));
assertFalse(updateResponse.isCreated());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();
@ -257,6 +265,7 @@ public class UpdateTests extends AbstractSharedClusterTest {
// check delete // check delete
updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx.op = 'delete'").execute().actionGet(); updateResponse = client().prepareUpdate("test", "type1", "1").setScript("ctx.op = 'delete'").execute().actionGet();
assertThat(updateResponse.getVersion(), equalTo(4L)); assertThat(updateResponse.getVersion(), equalTo(4L));
assertFalse(updateResponse.isCreated());
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet(); GetResponse getResponse = client().prepareGet("test", "type1", "1").execute().actionGet();

View File

@ -19,56 +19,51 @@
package org.elasticsearch.test.integration.versioning; package org.elasticsearch.test.integration.versioning;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.HashMap;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
/** /**
* *
*/ */
public class SimpleVersioningTests extends AbstractSharedClusterTest { public class SimpleVersioningTests extends AbstractSharedClusterTest {
@Test @Test
public void testExternalVersioningInitialDelete() throws Exception { public void testExternalVersioningInitialDelete() throws Exception {
client().admin().indices().prepareDelete().execute().actionGet(); createIndex("test"); ensureGreen();
client().admin().indices().prepareCreate("test").execute().actionGet();
client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet();
// Note - external version doesn't throw version conflicts on deletes of non existent records. This is different from internal versioning
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(17).setVersionType(VersionType.EXTERNAL).execute().actionGet(); DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(17).setVersionType(VersionType.EXTERNAL).execute().actionGet();
assertThat(deleteResponse.isNotFound(), equalTo(true)); assertThat(deleteResponse.isNotFound(), equalTo(true));
try { // this should conflict with the delete command transaction which told us that the object was deleted at version 17.
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.EXTERNAL).execute().actionGet(); assertThrows(
} catch (ElasticSearchException e) { client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.EXTERNAL).execute(),
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); VersionConflictEngineException.class
} );
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(18).setVersionType(VersionType.EXTERNAL).execute().actionGet(); IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(18).
setVersionType(VersionType.EXTERNAL).execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(18L));
} }
@Test @Test
public void testExternalVersioning() throws Exception { public void testExternalVersioning() throws Exception {
try { createIndex("test"); ensureGreen();
client().admin().indices().prepareDelete("test").execute().actionGet();
} catch (IndexMissingException e) {
// its ok
}
client().admin().indices().prepareCreate("test").execute().actionGet();
client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet();
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(12).setVersionType(VersionType.EXTERNAL).execute().actionGet(); IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(12).setVersionType(VersionType.EXTERNAL).execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(12l)); assertThat(indexResponse.getVersion(), equalTo(12l));
@ -76,41 +71,73 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest {
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(14).setVersionType(VersionType.EXTERNAL).execute().actionGet(); indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(14).setVersionType(VersionType.EXTERNAL).execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(14l)); assertThat(indexResponse.getVersion(), equalTo(14l));
try { assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.EXTERNAL).execute(),
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.EXTERNAL).execute().actionGet(); VersionConflictEngineException.class);
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
}
client().admin().indices().prepareRefresh().execute().actionGet(); client().admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
assertThat(client().prepareGet("test", "type", "1").execute().actionGet().getVersion(), equalTo(14l)); assertThat(client().prepareGet("test", "type", "1").execute().actionGet().getVersion(), equalTo(14l));
} }
// deleting with a lower version fails.
assertThrows(
client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL).execute(),
VersionConflictEngineException.class);
// Delete with a higher version deletes all versions up to the given one.
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(17).setVersionType(VersionType.EXTERNAL).execute().actionGet(); DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(17).setVersionType(VersionType.EXTERNAL).execute().actionGet();
assertThat(deleteResponse.isNotFound(), equalTo(false)); assertThat(deleteResponse.isNotFound(), equalTo(false));
assertThat(deleteResponse.getVersion(), equalTo(17l)); assertThat(deleteResponse.getVersion(), equalTo(17l));
try { // Deleting with a lower version keeps on failing after a delete.
client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL).execute().actionGet(); assertThrows(
} catch (ElasticSearchException e) { client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL).execute(),
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); VersionConflictEngineException.class);
}
// But delete with a higher version is OK.
deleteResponse = client().prepareDelete("test", "type", "1").setVersion(18).setVersionType(VersionType.EXTERNAL).execute().actionGet(); deleteResponse = client().prepareDelete("test", "type", "1").setVersion(18).setVersionType(VersionType.EXTERNAL).execute().actionGet();
assertThat(deleteResponse.isNotFound(), equalTo(true)); assertThat(deleteResponse.isNotFound(), equalTo(true));
assertThat(deleteResponse.getVersion(), equalTo(18l)); assertThat(deleteResponse.getVersion(), equalTo(18l));
// TODO: This behavior breaks rest api returning http status 201, good news is that it this is only the case until deletes GC kicks in.
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(19).setVersionType(VersionType.EXTERNAL).execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(19l));
deleteResponse = client().prepareDelete("test", "type", "1").setVersion(20).setVersionType(VersionType.EXTERNAL).execute().actionGet();
assertThat(deleteResponse.isNotFound(), equalTo(false));
assertThat(deleteResponse.getVersion(), equalTo(20l));
// Make sure that the next delete will be GC. Note we do it on the index settings so it will be cleaned up
HashMap<String,Object> newSettings = new HashMap<String, Object>();
newSettings.put("index.gc_deletes",-1);
client().admin().indices().prepareUpdateSettings("test").setSettings(newSettings).execute().actionGet();
Thread.sleep(300); // gc works based on estimated sampled time. Give it a chance...
// And now we have previous version return -1
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(20).setVersionType(VersionType.EXTERNAL).execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(20l));
} }
@Test @Test
public void testSimpleVersioning() throws Exception { public void testInternalVersioningInitialDelete() throws Exception {
try { createIndex("test"); ensureGreen();
client().admin().indices().prepareDelete("test").execute().actionGet();
} catch (IndexMissingException e) { assertThrows(client().prepareDelete("test", "type", "1").setVersion(17).execute(),
// its ok VersionConflictEngineException.class);
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1")
.setCreate(true).execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(1l));
} }
client().admin().indices().prepareCreate("test").execute().actionGet();
client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet();
@Test
public void testInternalVersioning() throws Exception {
createIndex("test"); ensureGreen();
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(1l)); assertThat(indexResponse.getVersion(), equalTo(1l));
@ -118,41 +145,31 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest {
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet(); indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(2l)); assertThat(indexResponse.getVersion(), equalTo(2l));
try { assertThrows(
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet(); client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
} catch (ElasticSearchException e) { VersionConflictEngineException.class);
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
}
try { assertThrows(
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet(); client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
} catch (ElasticSearchException e) { VersionConflictEngineException.class);
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
}
try { assertThrows(
client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet(); client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
} catch (ElasticSearchException e) { VersionConflictEngineException.class);
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); assertThrows(
} client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
VersionConflictEngineException.class);
try { assertThrows(
client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet(); client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(2).execute(),
} catch (ElasticSearchException e) { DocumentAlreadyExistsException.class);
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); assertThrows(
} client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(2).execute(),
DocumentAlreadyExistsException.class);
try {
client().prepareDelete("test", "type", "1").setVersion(1).execute().actionGet();
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
}
try { assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
client().prepareDelete("test", "type", "1").setVersion(1).execute().actionGet(); assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
}
client().admin().indices().prepareRefresh().execute().actionGet(); client().admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
@ -168,19 +185,18 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest {
// search without versioning // search without versioning
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).execute().actionGet(); SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).execute().actionGet();
assertThat(searchResponse.getHits().getAt(0).version(), equalTo(-1l)); assertThat(searchResponse.getHits().getAt(0).version(), equalTo(Versions.NOT_FOUND));
} }
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(2).execute().actionGet(); DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(2).execute().actionGet();
assertThat(deleteResponse.isNotFound(), equalTo(false)); assertThat(deleteResponse.isNotFound(), equalTo(false));
assertThat(deleteResponse.getVersion(), equalTo(3l)); assertThat(deleteResponse.getVersion(), equalTo(3l));
try { assertThrows(client().prepareDelete("test", "type", "1").setVersion(2).execute(), VersionConflictEngineException.class);
client().prepareDelete("test", "type", "1").setVersion(2).execute().actionGet();
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
}
// This is intricate - the object was deleted but a delete transaction was with the right version. We add another one
// and thus the transcation is increased.
deleteResponse = client().prepareDelete("test", "type", "1").setVersion(3).execute().actionGet(); deleteResponse = client().prepareDelete("test", "type", "1").setVersion(3).execute().actionGet();
assertThat(deleteResponse.isNotFound(), equalTo(true)); assertThat(deleteResponse.isNotFound(), equalTo(true));
assertThat(deleteResponse.getVersion(), equalTo(4l)); assertThat(deleteResponse.getVersion(), equalTo(4l));
@ -188,13 +204,7 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest {
@Test @Test
public void testSimpleVersioningWithFlush() throws Exception { public void testSimpleVersioningWithFlush() throws Exception {
try { createIndex("test"); ensureGreen();
client().admin().indices().prepareDelete("test").execute().actionGet();
} catch (IndexMissingException e) {
// its ok
}
client().admin().indices().prepareCreate("test").execute().actionGet();
client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet();
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet();
assertThat(indexResponse.getVersion(), equalTo(1l)); assertThat(indexResponse.getVersion(), equalTo(1l));
@ -206,41 +216,19 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest {
client().admin().indices().prepareFlush().execute().actionGet(); client().admin().indices().prepareFlush().execute().actionGet();
try { assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet(); VersionConflictEngineException.class);
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
}
try { assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet(); VersionConflictEngineException.class);
} catch (ElasticSearchException e) { assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); VersionConflictEngineException.class);
}
try { assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet(); VersionConflictEngineException.class);
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
}
try { assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet(); assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
}
try {
client().prepareDelete("test", "type", "1").setVersion(1).execute().actionGet();
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
}
try {
client().prepareDelete("test", "type", "1").setVersion(1).execute().actionGet();
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
}
client().admin().indices().prepareRefresh().execute().actionGet(); client().admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
@ -255,13 +243,7 @@ public class SimpleVersioningTests extends AbstractSharedClusterTest {
@Test @Test
public void testVersioningWithBulk() { public void testVersioningWithBulk() {
try { createIndex("test"); ensureGreen();
client().admin().indices().prepareDelete("test").execute().actionGet();
} catch (IndexMissingException e) {
// its ok
}
client().admin().indices().prepareCreate("test").execute().actionGet();
client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet();
BulkResponse bulkResponse = client().prepareBulk().add(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1")).execute().actionGet(); BulkResponse bulkResponse = client().prepareBulk().add(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1")).execute().actionGet();
assertThat(bulkResponse.hasFailures(), equalTo(false)); assertThat(bulkResponse.hasFailures(), equalTo(false));

View File

@ -30,6 +30,7 @@ import org.apache.lucene.search.TermQuery;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
@ -72,6 +73,8 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
/** /**
* *
@ -974,6 +977,41 @@ public abstract class AbstractSimpleEngineTests {
} }
} }
@Test
public void testBasicCreatedFlag() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertFalse(index.created());
engine.delete(new Engine.Delete(null, "1", newUid("1")));
index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
}
@Test
public void testCreatedFlagAfterFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
engine.delete(new Engine.Delete(null, "1", newUid("1")));
engine.flush(new Engine.Flush());
index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
}
protected Term newUid(String id) { protected Term newUid(String id) {
return new Term("_uid", id); return new Term("_uid", id);
} }